STREAMS-186:Delete unneeded Configurator classes, this closes 
apache/incubator-streams#304


Project: http://git-wip-us.apache.org/repos/asf/incubator-streams/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-streams/commit/b273496c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-streams/tree/b273496c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-streams/diff/b273496c

Branch: refs/heads/master
Commit: b273496c966235fae6a57e38bd47bf811a8b50f4
Parents: 48d54c2
Author: smarthi <[email protected]>
Authored: Fri Oct 21 14:55:00 2016 -0400
Committer: smarthi <[email protected]>
Committed: Fri Oct 21 14:55:00 2016 -0400

----------------------------------------------------------------------
 .../components/http/HttpConfigurator.java       |  75 -----
 .../persist/SimpleHTTPPostPersistWriter.java    |  31 +-
 .../http/processor/SimpleHTTPGetProcessor.java  |  28 +-
 .../http/processor/SimpleHTTPPostProcessor.java |  34 +-
 .../http/provider/SimpleHTTPGetProvider.java    |  64 ----
 .../http/provider/SimpleHttpProvider.java       |  32 +-
 .../streams/config/StreamsConfigurator.java     |   2 -
 .../org/apache/streams/s3/S3Configurator.java   |  76 -----
 .../ElasticsearchConfigurator.java              |  77 -----
 .../ElasticsearchPersistWriter.java             |  37 ++-
 .../elasticsearch/ElasticsearchQuery.java       |  17 +-
 .../DatumFromMetadataAsDocumentProcessor.java   |  31 +-
 .../processor/DatumFromMetadataProcessor.java   |  17 +-
 .../processor/DocumentToMetadataProcessor.java  |  18 +-
 .../filebuffer/FileBufferConfigurator.java      |  53 ----
 .../filebuffer/FileBufferPersistReader.java     |   7 +-
 .../filebuffer/FileBufferPersistWriter.java     |  11 +-
 .../apache/streams/hbase/HbaseConfigurator.java |  61 ----
 .../streams/hbase/HbasePersistWriter.java       |  31 +-
 .../streams/hbase/HbasePersistWriterTask.java   |   2 +-
 .../apache/streams/hdfs/HdfsConfigurator.java   |  59 ----
 .../apache/streams/kafka/KafkaConfigurator.java |  49 ---
 .../streams/kafka/KafkaPersistReader.java       |  19 +-
 .../streams/kafka/KafkaPersistWriter.java       |  16 +-
 .../apache/streams/mongo/MongoConfigurator.java |  49 ---
 .../streams/mongo/MongoPersistReader.java       |  66 ++--
 .../streams/mongo/MongoPersistWriter.java       |  24 +-
 .../peoplepattern/AccountTypeProcessor.java     |  11 +-
 .../peoplepattern/DemographicsProcessor.java    |  11 +-
 .../provider/FacebookStreamConfigurator.java    |  66 ----
 .../com/google/gmail/GMailConfigurator.java     |  45 ---
 .../google/gmail/provider/GMailProvider.java    |  43 +--
 .../gplus/provider/AbstractGPlusProvider.java   |  21 +-
 .../gplus/provider/GPlusConfigurator.java       |  54 ----
 .../instagram/InstagramConfigurator.java        |  65 ----
 .../provider/InstagramAbstractProvider.java     |  26 +-
 .../InstagramRecentMediaProvider.java           |  21 +-
 .../data/moreover/MoreoverConfigurator.java     |  58 ----
 .../rss/provider/RssStreamConfigurator.java     |  48 ---
 .../streams/rss/provider/RssStreamProvider.java |  47 ++-
 .../serializer/SyndEntryActivitySerializer.java |  11 +-
 .../sysomos/config/SysomosConfigurator.java     |  39 ---
 .../FetchAndReplaceTwitterProcessor.java        |   9 +-
 .../twitter/provider/TwitterStreamProvider.java |  10 +-
 .../youtube/provider/YoutubeConfigurator.java   |  50 ---
 .../com/youtube/provider/YoutubeProvider.java   |  18 +-
 .../queues/ThroughputQueueMulitThreadTest.java  | 315 -------------------
 .../queues/ThroughputQueueMultiThreadTest.java  | 315 +++++++++++++++++++
 48 files changed, 609 insertions(+), 1660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
deleted file mode 100644
index ae17dbe..0000000
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/HttpConfigurator.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.components.http;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of 
HttpConfiguration
- */
-public class HttpConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(HttpConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static HttpProviderConfiguration detectProviderConfiguration(Config 
config) {
-
-        HttpProviderConfiguration httpProviderConfiguration = null;
-
-        try {
-            httpProviderConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpProviderConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse http configuration", e.getMessage());
-        }
-        return httpProviderConfiguration;
-    }
-
-    public static HttpProcessorConfiguration 
detectProcessorConfiguration(Config config) {
-
-        HttpProcessorConfiguration httpProcessorConfiguration = null;
-
-        try {
-            httpProcessorConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpProcessorConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse http configuration", e.getMessage());
-        }
-        return httpProcessorConfiguration;
-    }
-
-    public static HttpPersistWriterConfiguration 
detectPersistWriterConfiguration(Config config) {
-
-        HttpPersistWriterConfiguration httpPersistWriterConfiguration = null;
-
-        try {
-            httpPersistWriterConfiguration = 
mapper.readValue(config.root().render(ConfigRenderOptions.concise()), 
HttpPersistWriterConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse http configuration", e.getMessage());
-        }
-        return httpPersistWriterConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
index c7a0dd9..d8309d9 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/persist/SimpleHTTPPostPersistWriter.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpStatus;
@@ -34,8 +33,8 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
@@ -47,11 +46,9 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.HashMap;
 import java.util.Map;
 
-/**
- * Created by steve on 11/12/14.
- */
 public class SimpleHTTPPostPersistWriter implements StreamsPersistWriter {
 
     private final static String STREAMS_ID = "SimpleHTTPPostPersistWriter";
@@ -69,7 +66,8 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
     protected String authHeader;
 
     public SimpleHTTPPostPersistWriter() {
-        
this(HttpConfigurator.detectPersistWriterConfiguration(StreamsConfigurator.config.getConfig("http")));
+        this(new ComponentConfigurator<>(HttpPersistWriterConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
     }
 
     public SimpleHTTPPostPersistWriter(HttpPersistWriterConfiguration 
configuration) {
@@ -128,8 +126,7 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
      Override this to add parameters to the request
      */
     protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
+        return new HashMap<>();
     }
 
     /**
@@ -157,9 +154,7 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
         try {
             String entity = mapper.writeValueAsString(payload);
             httppost.setEntity(new StringEntity(entity));
-        } catch (JsonProcessingException e) {
-            LOGGER.warn(e.getMessage());
-        } catch (UnsupportedEncodingException e) {
+        } catch (JsonProcessingException | UnsupportedEncodingException e) {
             LOGGER.warn(e.getMessage());
         }
         return httppost;
@@ -173,7 +168,7 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
 
         CloseableHttpResponse response = null;
 
-        String entityString = null;
+        String entityString;
         try {
             response = httpclient.execute(httpPost);
             HttpEntity entity = response.getEntity();
@@ -186,8 +181,10 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
             LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), 
response, e.getMessage());
         } finally {
             try {
-                response.close();
-            } catch (IOException e) {}
+                if (response != null) {
+                    response.close();
+                }
+            } catch (IOException ignored) {}
         }
         return result;
     }
@@ -207,11 +204,7 @@ public class SimpleHTTPPostPersistWriter implements 
StreamsPersistWriter {
             uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
         if( !Strings.isNullOrEmpty(configuration.getUsername())
                 && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            StringBuilder stringBuilder = new StringBuilder();
-            stringBuilder.append(configuration.getUsername());
-            stringBuilder.append(":");
-            stringBuilder.append(configuration.getPassword());
-            String string = stringBuilder.toString();
+            String string = configuration.getUsername() + ":" + 
configuration.getPassword();
             authHeader = Base64.encodeBase64String(string.getBytes());
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 0d6747e..5868ba6 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -22,8 +22,6 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -32,13 +30,13 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,6 +44,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -71,7 +71,8 @@ public class SimpleHTTPGetProcessor implements 
StreamsProcessor {
 
     protected String authHeader;
     public SimpleHTTPGetProcessor() {
-        
this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
+        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
     }
 
     public SimpleHTTPGetProcessor(HttpProcessorConfiguration 
processorConfiguration) {
@@ -147,7 +148,7 @@ public class SimpleHTTPGetProcessor implements 
StreamsProcessor {
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
 
-        List<StreamsDatum> result = Lists.newArrayList();
+        List<StreamsDatum> result = new ArrayList<>();
 
         ObjectNode rootDocument = getRootDocument(entry);
 
@@ -172,8 +173,10 @@ public class SimpleHTTPGetProcessor implements 
StreamsProcessor {
             return result;
         } finally {
             try {
-                response.close();
-            } catch (IOException e) {}
+                if (response != null) {
+                    response.close();
+                }
+            } catch (IOException ignored) {}
         }
 
         if( entityString == null )
@@ -218,8 +221,7 @@ public class SimpleHTTPGetProcessor implements 
StreamsProcessor {
      Override this to add parameters to the request
      */
     protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
+        return new HashMap<>();
     }
 
     /**
@@ -251,11 +253,7 @@ public class SimpleHTTPGetProcessor implements 
StreamsProcessor {
             uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
         if( !Strings.isNullOrEmpty(configuration.getUsername())
             && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            StringBuilder stringBuilder = new StringBuilder();
-            stringBuilder.append(configuration.getUsername());
-            stringBuilder.append(":");
-            stringBuilder.append(configuration.getPassword());
-            String string = stringBuilder.toString();
+            String string = configuration.getUsername() + ":" + 
configuration.getPassword();
             authHeader = Base64.encodeBase64String(string.getBytes());
         }
         httpclient = HttpClients.createDefault();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
index 528e50c..f6089f6 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
@@ -22,12 +22,9 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
 import org.apache.http.client.methods.HttpPost;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.entity.ContentType;
@@ -35,8 +32,8 @@ import org.apache.http.entity.StringEntity;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
@@ -49,6 +46,8 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -72,7 +71,8 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
     protected String authHeader;
 
     public SimpleHTTPPostProcessor() {
-        
this(HttpConfigurator.detectProcessorConfiguration(StreamsConfigurator.config.getConfig("http")));
+        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
     }
 
     public SimpleHTTPPostProcessor(HttpProcessorConfiguration 
processorConfiguration) {
@@ -148,7 +148,7 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
 
-        List<StreamsDatum> result = Lists.newArrayList();
+        List<StreamsDatum> result = new ArrayList<>();
 
         ObjectNode rootDocument = getRootDocument(entry);
 
@@ -184,8 +184,10 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
             return result;
         } finally {
             try {
-                response.close();
-            } catch (IOException e) {}
+                if (response != null) {
+                    response.close();
+                }
+            } catch (IOException ignored) {}
         }
 
         if( entityString == null )
@@ -213,19 +215,15 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
      Override this to add parameters to the request
      */
     protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
+        return new HashMap<>();
     }
 
     /**
      Override this to add parameters to the request
      */
     protected HttpEntity preparePayload(StreamsDatum entry) {
-
-        HttpEntity entity = (new StringEntity("{}",
-                ContentType.create("application/json")));
-
-        return entity;
+        return new StringEntity("{}",
+                ContentType.create("application/json"));
     }
 
 
@@ -252,11 +250,7 @@ public class SimpleHTTPPostProcessor implements 
StreamsProcessor {
             uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
         if( !Strings.isNullOrEmpty(configuration.getUsername())
             && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            StringBuilder stringBuilder = new StringBuilder();
-            stringBuilder.append(configuration.getUsername());
-            stringBuilder.append(":");
-            stringBuilder.append(configuration.getPassword());
-            String string = stringBuilder.toString();
+            String string = configuration.getUsername() + ":" + 
configuration.getPassword();
             authHeader = Base64.encodeBase64String(string.getBytes());
         }
         httpclient = HttpClients.createDefault();

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
deleted file mode 100644
index fae01cc..0000000
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHTTPGetProvider.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.components.http.provider;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpGet;
-import org.apache.http.client.utils.URIBuilder;
-import org.apache.http.impl.client.CloseableHttpClient;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpConfigurator;
-import org.apache.streams.components.http.HttpProviderConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProvider;
-import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.*;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * @Deprecated
- * Replaced by SimpleHttpProvider, which can use GET or POST
- */
-@Deprecated
-public class SimpleHTTPGetProvider extends SimpleHttpProvider {
-
-    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
index 46d1cca..2078647 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
@@ -24,9 +24,7 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpGet;
@@ -39,8 +37,8 @@ import org.apache.http.conn.ssl.TrustSelfSignedStrategy;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpConfigurator;
 import org.apache.streams.components.http.HttpProviderConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProvider;
@@ -58,19 +56,15 @@ import java.security.KeyManagementException;
 import java.security.KeyStoreException;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
-import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -91,14 +85,15 @@ public class SimpleHttpProvider implements StreamsProvider {
 
     protected HttpProviderConfiguration configuration;
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<StreamsDatum>();
+    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
 
     protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
     private ExecutorService executor;
 
     public SimpleHttpProvider() {
-        
this(HttpConfigurator.detectProviderConfiguration(StreamsConfigurator.config.getConfig("http")));
+        this(new ComponentConfigurator<>(HttpProviderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
     }
 
     public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) 
{
@@ -116,8 +111,7 @@ public class SimpleHttpProvider implements StreamsProvider {
       Override this to add parameters to the request
      */
     protected Map<String, String> prepareParams(StreamsDatum entry) {
-
-        return Maps.newHashMap();
+        return new HashMap<>();
     }
 
     public HttpRequestBase prepareHttpRequest(URI uri) {
@@ -154,11 +148,7 @@ public class SimpleHttpProvider implements StreamsProvider 
{
             builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
             sslsf = new SSLConnectionSocketFactory(
                     builder.build(), 
SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
-        } catch (NoSuchAlgorithmException e) {
-            LOGGER.warn(e.getMessage());
-        } catch (KeyManagementException e) {
-            LOGGER.warn(e.getMessage());
-        } catch (KeyStoreException e) {
+        } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException e) {
             LOGGER.warn(e.getMessage());
         }
 
@@ -243,7 +233,7 @@ public class SimpleHttpProvider implements StreamsProvider {
 
         CloseableHttpResponse response = null;
 
-        String entityString = null;
+        String entityString;
         try {
             response = httpclient.execute(httpRequest);
             HttpEntity entity = response.getEntity();
@@ -259,8 +249,10 @@ public class SimpleHttpProvider implements StreamsProvider 
{
             LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
         } finally {
             try {
-                response.close();
-            } catch (IOException e) {}
+                if (response != null) {
+                    response.close();
+                }
+            } catch (IOException ignored) {}
         }
         return results;
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
index 9fa20b6..6a8fb1d 100644
--- 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
+++ 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
@@ -19,14 +19,12 @@
 package org.apache.streams.config;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.reflect.TypeToken;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.net.MalformedURLException;
 import java.net.URL;
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
deleted file mode 100644
index de32898..0000000
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-s3/src/main/java/org/apache/streams/s3/S3Configurator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.streams.s3;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class S3Configurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(S3Configurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static S3Configuration detectConfiguration(Config s3) {
-
-        S3Configuration s3Configuration = null;
-
-        try {
-            s3Configuration = 
mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), 
S3Configuration.class);
-        } catch (Exception e) {
-            LOGGER.warn("Could not parse S3Configuration", e);
-        }
-
-        return s3Configuration;
-    }
-
-    public static S3ReaderConfiguration detectReaderConfiguration(Config s3) {
-
-        S3ReaderConfiguration s3Configuration = null;
-
-        try {
-            s3Configuration = 
mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), 
S3ReaderConfiguration.class);
-        } catch (Exception e) {
-            LOGGER.warn("Could not parse S3Configuration", e);
-        }
-
-        return s3Configuration;
-    }
-
-    public static S3WriterConfiguration detectWriterConfiguration(Config s3) {
-
-        S3WriterConfiguration s3Configuration = null;
-
-        try {
-            s3Configuration = 
mapper.readValue(s3.root().render(ConfigRenderOptions.concise()), 
S3WriterConfiguration.class);
-
-            if(!s3Configuration.getWriterPath().endsWith("/")) {
-                s3Configuration.setWriterPath(s3Configuration.getWriterPath() 
+ "/");
-            }
-        } catch (Exception e) {
-            LOGGER.warn("Could not parse S3Configuration", e);
-        }
-
-        return s3Configuration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
deleted file mode 100644
index 439b5de..0000000
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchConfigurator.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.elasticsearch;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Converts a {@link com.typesafe.config.Config} element into an instance of 
ElasticSearchConfiguration
- */
-public class ElasticsearchConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static ElasticsearchConfiguration detectConfiguration(Config 
elasticsearch) {
-
-        ElasticsearchConfiguration elasticsearchConfiguration = null;
-
-        try {
-            elasticsearchConfiguration = 
mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), 
ElasticsearchConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse elasticsearchconfiguration");
-        }
-
-        return elasticsearchConfiguration;
-    }
-
-    public static ElasticsearchReaderConfiguration 
detectReaderConfiguration(Config elasticsearch) {
-
-        ElasticsearchReaderConfiguration elasticsearchReaderConfiguration = 
null;
-
-        try {
-            elasticsearchReaderConfiguration = 
mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), 
ElasticsearchReaderConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse elasticsearchconfiguration");
-        }
-
-        return elasticsearchReaderConfiguration;
-    }
-
-    public static ElasticsearchWriterConfiguration 
detectWriterConfiguration(Config elasticsearch) {
-
-        ElasticsearchWriterConfiguration elasticsearchWriterConfiguration = 
null;
-
-        try {
-            elasticsearchWriterConfiguration = 
mapper.readValue(elasticsearch.root().render(ConfigRenderOptions.concise()), 
ElasticsearchWriterConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse elasticsearchwriterconfiguration");
-        }
-        return elasticsearchWriterConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
index b268fae..49523f8 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchPersistWriter.java
@@ -22,9 +22,13 @@ package org.apache.streams.elasticsearch;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.*;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.DatumStatusCountable;
+import org.apache.streams.core.DatumStatusCounter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.ActionListener;
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
@@ -45,8 +49,12 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.text.DecimalFormat;
 import java.text.NumberFormat;
-import java.util.*;
-import java.util.concurrent.TimeUnit;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -57,7 +65,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriter.class);
     private static final NumberFormat MEGABYTE_FORMAT = new 
DecimalFormat("#.##");
     private static final NumberFormat NUMBER_FORMAT = new 
DecimalFormat("###,###,###,###");
-    private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5l * 1024l * 
1024l;
+    private static final Long DEFAULT_BULK_FLUSH_THRESHOLD = 5L * 1024L * 
1024L;
     private static final int DEFAULT_BATCH_SIZE = 100;
     //ES defaults its bulk index queue to 50 items.  We want to be under this 
on our backoff so set this to 1/2 ES default
     //at a batch size as configured here.
@@ -67,7 +75,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
 
     protected static final ObjectMapper OBJECT_MAPPER = 
StreamsJacksonMapper.getInstance();
 
-    protected final List<String> affectedIndexes = new ArrayList<String>();
+    protected final List<String> affectedIndexes = new ArrayList<>();
 
     protected final ElasticsearchClientManager manager;
     protected final ElasticsearchWriterConfiguration config;
@@ -96,7 +104,8 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
     private final AtomicLong totalSizeInBytes = new AtomicLong(0);
 
     public ElasticsearchPersistWriter() {
-        
this(ElasticsearchConfigurator.detectWriterConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
+        this(new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
     }
 
     public ElasticsearchPersistWriter(ElasticsearchWriterConfiguration config) 
{
@@ -170,10 +179,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
     }
 
     protected String docAsJson(Object streamsDocument) throws IOException {
-
-        String docAsJson = (streamsDocument instanceof String) ? 
streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument);
-
-        return docAsJson;
+        return (streamsDocument instanceof String) ? 
streamsDocument.toString() : OBJECT_MAPPER.writeValueAsString(streamsDocument);
     }
 
     protected StreamsDatum appendMetadata(StreamsDatum streamsDatum) throws 
IOException {
@@ -210,7 +216,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
             LOGGER.warn("This is unexpected: {}", e);
         } finally {
 
-            if( veryLargeBulk == true ) {
+            if(veryLargeBulk) {
                 resetRefreshInterval();
             }
 
@@ -219,7 +225,8 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
                 LOGGER.debug("refreshIndexes completed");
             }
 
-            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] 
Orphaned[{}]", this.totalOk.get(), this.totalFailed.get(), 
this.getTotalOutstanding());
+            LOGGER.debug("Closed ElasticSearch Writer: Ok[{}] Failed[{}] 
Orphaned[{}]",
+              this.totalOk.get(), this.totalFailed.get(), 
this.getTotalOutstanding());
             timer.cancel();
 
             LOGGER.debug("cleanUp completed");
@@ -275,7 +282,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
             return;
 
         // wait for one minute to catch up if it needs to
-        waitToCatchUp(5, 1 * 60 * 1000);
+        waitToCatchUp(5, 60 * 1000);
 
         // call the flush command.
         flush(this.bulkRequest, this.currentBatchItems.get(), 
this.currentBatchBytes.get());
@@ -306,7 +313,7 @@ public class ElasticsearchPersistWriter implements 
StreamsPersistWriter, DatumSt
     private void checkForBackOff() {
         try {
             if (this.getTotalOutstanding() > WAITING_DOCS_LIMIT) {
-                
/****************************************************************************
+                /*
                  * Author:
                  * Smashew
                  *

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 03f40d6..d9e9273 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -19,19 +19,21 @@
 package org.apache.streams.elasticsearch;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.script.Script;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
@@ -42,10 +44,9 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
 
     private ElasticsearchClientManager elasticsearchClientManager;
     private ElasticsearchReaderConfiguration config;
-    private List<String> indexes = Lists.newArrayList();
-    private List<String> types = Lists.newArrayList();
+    private List<String> indexes = new ArrayList<>();
+    private List<String> types = new ArrayList<>();
     private int limit = 1000 * 1000 * 1000; // we are going to set the default 
limit very high to 1bil
-    private boolean random = false;
     private int batchSize = 100;
     private String scrollTimeout = "5m";
     private org.elasticsearch.index.query.QueryBuilder queryBuilder;
@@ -59,7 +60,8 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
     private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
 
     public ElasticsearchQuery() {
-        
this(ElasticsearchConfigurator.detectReaderConfiguration(StreamsConfigurator.config.getConfig("elasticsearch")));
+        this(new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
     }
 
     public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
@@ -141,8 +143,9 @@ public class ElasticsearchQuery implements 
Iterable<SearchHit>, Iterator<SearchH
                 search = search.setTypes(types.toArray(new String[0]));
 
             // TODO: Replace when all clusters are upgraded past 0.90.4 so we 
can implement a RANDOM scroll.
-            if (this.random)
-                search = search.addSort(SortBuilders.scriptSort("random()", 
"number"));
+            boolean random = false;
+            if (random)
+                search = search.addSort(SortBuilders.scriptSort(new 
Script("random()"), "number"));
         }
 
         // We don't have a scroll, we need to create a scroll

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 355c471..26012ef 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -18,18 +18,20 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.typesafe.config.Config;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
@@ -37,18 +39,12 @@ import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 /**
  * Uses index and type in metadata map stored in datum document to populate 
current document into datums
  */
 public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, 
Serializable {
 
-    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
 
     private ElasticsearchClientManager elasticsearchClientManager;
     private ElasticsearchReaderConfiguration config;
@@ -56,12 +52,13 @@ public class DatumFromMetadataAsDocumentProcessor 
implements StreamsProcessor, S
     private ObjectMapper mapper;
 
     public DatumFromMetadataAsDocumentProcessor() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = 
ElasticsearchConfigurator.detectReaderConfiguration(config);
+        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
     }
 
     public DatumFromMetadataAsDocumentProcessor(Config config) {
-        this.config = 
ElasticsearchConfigurator.detectReaderConfiguration(config);
+        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
     }
 
     public 
DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) {
@@ -75,7 +72,7 @@ public class DatumFromMetadataAsDocumentProcessor implements 
StreamsProcessor, S
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newArrayList();
+        List<StreamsDatum> result = new ArrayList<>();
 
         ObjectNode metadataObjectNode;
         try {
@@ -86,7 +83,7 @@ public class DatumFromMetadataAsDocumentProcessor implements 
StreamsProcessor, S
 
         Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
-        if(entry == null || entry.getMetadata() == null)
+        if(entry.getMetadata() == null)
             return result;
 
         String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
@@ -98,7 +95,7 @@ public class DatumFromMetadataAsDocumentProcessor implements 
StreamsProcessor, S
         getRequestBuilder.setFetchSource(true);
         GetResponse getResponse = getRequestBuilder.get();
 
-        if( getResponse == null || getResponse.isExists() == false || 
getResponse.isSourceEmpty() == true )
+        if( getResponse == null || !getResponse.isExists() || 
getResponse.isSourceEmpty())
             return result;
 
         entry.setDocument(getResponse.getSource());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index 512e191..f1b7bfc 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -18,13 +18,12 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.google.common.collect.Lists;
 import com.typesafe.config.Config;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.elasticsearch.action.get.GetRequestBuilder;
@@ -32,6 +31,7 @@ import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -40,18 +40,19 @@ import java.util.Map;
  */
 public class DatumFromMetadataProcessor implements StreamsProcessor, 
Serializable {
 
-    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
 
     private ElasticsearchClientManager elasticsearchClientManager;
     private ElasticsearchReaderConfiguration config;
 
     public DatumFromMetadataProcessor() {
-        Config config = StreamsConfigurator.config.getConfig("elasticsearch");
-        this.config = 
ElasticsearchConfigurator.detectReaderConfiguration(config);
+        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
     }
 
     public DatumFromMetadataProcessor(Config config) {
-        this.config = 
ElasticsearchConfigurator.detectReaderConfiguration(config);
+        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
     }
 
     public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) 
{
@@ -65,7 +66,7 @@ public class DatumFromMetadataProcessor implements 
StreamsProcessor, Serializabl
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newArrayList();
+        List<StreamsDatum> result = new ArrayList<>();
 
         if(entry == null || entry.getMetadata() == null)
             return result;
@@ -81,7 +82,7 @@ public class DatumFromMetadataProcessor implements 
StreamsProcessor, Serializabl
         getRequestBuilder.setFetchSource(true);
         GetResponse getResponse = getRequestBuilder.get();
 
-        if( getResponse == null || getResponse.isExists() == false || 
getResponse.isSourceEmpty() == true )
+        if( getResponse == null || getResponse.isExists() || 
getResponse.isSourceEmpty() )
             return result;
 
         entry.setDocument(getResponse.getSource());

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index 41cc12d..9a08654 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -18,28 +18,18 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.elasticsearch.ElasticsearchClientManager;
-import org.apache.streams.elasticsearch.ElasticsearchConfigurator;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
-import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.io.Serializable;
-import java.util.Iterator;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -51,7 +41,7 @@ import java.util.Map;
  */
 public class DocumentToMetadataProcessor implements StreamsProcessor, 
Serializable {
 
-    public final static String STREAMS_ID = "DatumFromMetadataProcessor";
+    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
 
     private ObjectMapper mapper;
 
@@ -67,7 +57,7 @@ public class DocumentToMetadataProcessor implements 
StreamsProcessor, Serializab
 
     @Override
     public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = Lists.newArrayList();
+        List<StreamsDatum> result = new ArrayList<>();
 
         Object object = entry.getDocument();
         ObjectNode metadataObjectNode;
@@ -81,7 +71,7 @@ public class DocumentToMetadataProcessor implements 
StreamsProcessor, Serializab
 
         Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
-        if(entry == null || metadata == null)
+        if(metadata == null)
             return result;
 
         entry.setMetadata(metadata);

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java
deleted file mode 100644
index 23e6d7c..0000000
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferConfigurator.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.filebuffer;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Resolves FileBufferConfiguration from typesafe
- *
- * Deprecated: use ComponentConfigurator<FileBufferConfiguration> instead.
- */
-@Deprecated
-public class FileBufferConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(FileBufferConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static FileBufferConfiguration detectConfiguration(Config kafka) {
-
-        FileBufferConfiguration fileConfiguration = null;
-
-        try {
-            fileConfiguration = 
mapper.readValue(kafka.root().render(ConfigRenderOptions.concise()), 
FileBufferConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse FileConfiguration");
-        }
-
-        return fileConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
index 39eb853..504ea5e 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
@@ -22,11 +22,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Queues;
 import com.squareup.tape.QueueFile;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
-import org.apache.streams.filebuffer.FileBufferConfiguration;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,7 +48,7 @@ import java.util.concurrent.Executors;
  */
 public class FileBufferPersistReader implements StreamsPersistReader, 
Serializable {
 
-    public final static String STREAMS_ID = "FileBufferPersistReader";
+    public static final String STREAMS_ID = "FileBufferPersistReader";
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistReader.class);
 
@@ -66,7 +66,8 @@ public class FileBufferPersistReader implements 
StreamsPersistReader, Serializab
     private ExecutorService executor = Executors.newSingleThreadExecutor();
 
     public FileBufferPersistReader() {
-        
this(FileBufferConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("filebuffer")));
+        this(new ComponentConfigurator<>(FileBufferConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
     }
 
     public FileBufferPersistReader(FileBufferConfiguration config) {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
index 89c45c8..4dea85c 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
@@ -22,10 +22,10 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.squareup.tape.QueueFile;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.filebuffer.FileBufferConfiguration;
 import org.apache.streams.util.GuidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +54,8 @@ public class FileBufferPersistWriter implements 
StreamsPersistWriter, Serializab
     private QueueFile queueFile;
 
     public FileBufferPersistWriter() {
-       
this(FileBufferConfigurator.detectConfiguration(StreamsConfigurator.config.getConfig("filebuffer")));
+       this(new ComponentConfigurator<>(FileBufferConfiguration.class)
+         
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
     }
 
     public FileBufferPersistWriter(FileBufferConfiguration config) {
@@ -71,9 +72,9 @@ public class FileBufferPersistWriter implements 
StreamsPersistWriter, Serializab
 
         String key = entry.getId() != null ? entry.getId() : 
GuidUtils.generateGuid("filewriter");
 
-        Preconditions.checkArgument(Strings.isNullOrEmpty(key) == false);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(key));
         Preconditions.checkArgument(entry.getDocument() instanceof String);
-        
Preconditions.checkArgument(Strings.isNullOrEmpty((String)entry.getDocument()) 
== false);
+        Preconditions.checkArgument(!Strings.isNullOrEmpty((String) 
entry.getDocument()));
 
         byte[] item = ((String)entry.getDocument()).getBytes();
         try {
@@ -101,7 +102,7 @@ public class FileBufferPersistWriter implements 
StreamsPersistWriter, Serializab
 
         Preconditions.checkNotNull(queueFile);
 
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        this.persistQueue  = new ConcurrentLinkedQueue<>();
 
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
deleted file mode 100644
index 0d8c8f3..0000000
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbaseConfigurator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.hbase;
-
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class HbaseConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(HbaseConfigurator.class);
-
-    public static HbaseConfiguration detectConfiguration() {
-
-        Config zookeeper = StreamsConfigurator.config.getConfig("zookeeper");
-        Config hbase = StreamsConfigurator.config.getConfig("hbase");
-
-        String rootdir = hbase.getString("rootdir");
-
-        Config znode = zookeeper.getConfig("znode");
-
-        String rootserver = znode.getString("rootserver");
-        String parent = znode.getString("parent");
-        Integer clientPort = 
hbase.getConfig("zookeeper").getConfig("property").getInt("clientPort");
-        String quorum = hbase.getConfig("zookeeper").getString("quorum");
-
-        HbaseConfiguration hbaseConfiguration = new HbaseConfiguration();
-
-        hbaseConfiguration.setRootdir(rootdir);
-        hbaseConfiguration.setRootserver(rootserver);
-        hbaseConfiguration.setParent(parent);
-        hbaseConfiguration.setQuorum(quorum);
-        hbaseConfiguration.setClientPort(clientPort.longValue());
-        hbaseConfiguration.setTable(hbase.getString("table"));
-        hbaseConfiguration.setFamily(hbase.getString("family"));
-        hbaseConfiguration.setQualifier(hbase.getString("qualifier"));
-
-        return hbaseConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
index 4a51fab..1e066fb 100644
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
+++ 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
@@ -20,21 +20,27 @@ package org.apache.streams.hbase;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.HTablePool;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.util.GuidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
 public class HbasePersistWriter implements StreamsPersistWriter, Flushable, 
Closeable
 {
     public final static String STREAMS_ID = "HbasePersistWriter";
@@ -53,12 +59,14 @@ public class HbasePersistWriter implements 
StreamsPersistWriter, Flushable, Clos
     private HbaseConfiguration config;
 
     public HbasePersistWriter() {
-        this.config = HbaseConfigurator.detectConfiguration();
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
+        this.persistQueue  = new ConcurrentLinkedQueue<>();
     }
 
     public HbasePersistWriter(Queue<StreamsDatum> persistQueue) {
-        this.config = HbaseConfigurator.detectConfiguration();
+        this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
         this.persistQueue = persistQueue;
     }
 
@@ -110,7 +118,6 @@ public class HbasePersistWriter implements 
StreamsPersistWriter, Flushable, Clos
         {
             LOGGER.error("There was an error connecting to HBase, please check 
your settings and try again");
             e.printStackTrace();
-            return;
         }
     }
 
@@ -164,7 +171,6 @@ public class HbasePersistWriter implements 
StreamsPersistWriter, Flushable, Clos
         } catch (IOException e) {
             e.printStackTrace();
             LOGGER.warn("Failure executin put: {}", 
streamsDatum.getDocument().toString());
-            return;
         }
 
     }
@@ -191,7 +197,6 @@ public class HbasePersistWriter implements 
StreamsPersistWriter, Flushable, Clos
             task.join();
         } catch (InterruptedException e) {
             e.printStackTrace();
-            return;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
index 5484f8e..19a398d 100644
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
+++ 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
@@ -48,7 +48,7 @@ public class HbasePersistWriterTask implements Runnable {
             }
             try {
                 Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException e) {}
+            } catch (InterruptedException ignored) {}
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
deleted file mode 100644
index c4823c3..0000000
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConfigurator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.hdfs;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigRenderOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class HdfsConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(HdfsConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
-
-    public static HdfsConfiguration detectConfiguration(Config hdfs) {
-
-        HdfsConfiguration hdfsConfiguration = null;
-
-        try {
-            hdfsConfiguration = 
mapper.readValue(hdfs.root().render(ConfigRenderOptions.concise()), 
HdfsConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse HdfsConfiguration");
-        }
-        return hdfsConfiguration;
-    }
-
-    public static HdfsReaderConfiguration detectReaderConfiguration(Config 
hdfs) {
-
-        return mapper.convertValue(detectConfiguration(hdfs), 
HdfsReaderConfiguration.class);
-    }
-
-    public static HdfsWriterConfiguration detectWriterConfiguration(Config 
hdfs) {
-
-        return mapper.convertValue(detectConfiguration(hdfs), 
HdfsWriterConfiguration.class);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
deleted file mode 100644
index 9f64ae6..0000000
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaConfigurator.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.kafka;
-
-import com.typesafe.config.Config;
-import org.apache.streams.config.StreamsConfigurator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Created by sblackmon on 12/10/13.
- */
-public class KafkaConfigurator {
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(KafkaConfigurator.class);
-
-    public static KafkaConfiguration detectConfiguration(Config kafka) {
-        String brokerlist = 
StreamsConfigurator.config.getString("kafka.metadata.broker.list");
-        String zkconnect = 
StreamsConfigurator.config.getString("kafka.zkconnect");
-        String topic = StreamsConfigurator.config.getString("kafka.topic");
-        String groupId = StreamsConfigurator.config.getString("kafka.groupid");
-
-        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
-
-        kafkaConfiguration.setBrokerlist(brokerlist);
-        kafkaConfiguration.setZkconnect(zkconnect);
-        kafkaConfiguration.setTopic(topic);
-        kafkaConfiguration.setGroupId(groupId);
-
-        return kafkaConfiguration;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
index 7d31d41..d54e794 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistReader.java
@@ -19,7 +19,6 @@
 package org.apache.streams.kafka;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.KafkaStream;
@@ -27,6 +26,7 @@ import kafka.consumer.Whitelist;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
 import kafka.utils.VerifiableProperties;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
@@ -35,8 +35,6 @@ import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.streams.kafka.KafkaConfiguration;
-
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
@@ -59,7 +57,6 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
 
     private KafkaConfiguration config;
 
-    private ConsumerConfig consumerConfig;
     private ConsumerConnector consumerConnector;
 
     public List<KafkaStream<String, String>> inStreams;
@@ -67,14 +64,14 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
     private ExecutorService executor = Executors.newSingleThreadExecutor();
 
     public KafkaPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+        this.persistQueue  = new ConcurrentLinkedQueue<>();
     }
 
     public KafkaPersistReader(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
         this.persistQueue = persistQueue;
     }
 
@@ -93,7 +90,7 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
         Properties props = new Properties();
         props.setProperty("serializer.encoding", "UTF8");
 
-        consumerConfig = new ConsumerConfig(props);
+        ConsumerConfig consumerConfig = new ConsumerConfig(props);
 
         consumerConnector = 
Consumer.createJavaConsumerConnector(consumerConfig);
 
@@ -154,7 +151,7 @@ public class KafkaPersistReader implements 
StreamsPersistReader, Serializable {
         while( !executor.isTerminated()) {
             try {
                 executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {}
+            } catch (InterruptedException ignored) {}
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/b273496c/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
index 0f685c5..83032e6 100644
--- 
a/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
+++ 
b/streams-contrib/streams-persist-kafka/src/main/java/org/apache/streams/kafka/KafkaPersistWriter.java
@@ -20,10 +20,10 @@ package org.apache.streams.kafka;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
+import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
@@ -51,14 +51,14 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
     private Producer<String, String> producer;
 
     public KafkaPersistWriter() {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
+        this.persistQueue  = new ConcurrentLinkedQueue<>();
     }
 
     public KafkaPersistWriter(Queue<StreamsDatum> persistQueue) {
-        Config config = StreamsConfigurator.config.getConfig("kafka");
-        this.config = KafkaConfigurator.detectConfiguration(config);
+        this.config = new ComponentConfigurator<>(KafkaConfiguration.class)
+          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("kafka"));
         this.persistQueue = persistQueue;
     }
 
@@ -76,7 +76,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
 
         ProducerConfig config = new ProducerConfig(props);
 
-        producer = new Producer<String, String>(config);
+        producer = new Producer<>(config);
 
         new Thread(new KafkaPersistWriterTask(this)).start();
     }
@@ -106,7 +106,7 @@ public class KafkaPersistWriter implements 
StreamsPersistWriter, Serializable, R
 
             String hash = GuidUtils.generateGuid(text);
 
-            KeyedMessage<String, String> data = new KeyedMessage<String, 
String>(config.getTopic(), hash, text);
+            KeyedMessage<String, String> data = new 
KeyedMessage<>(config.getTopic(), hash, text);
 
             producer.send(data);
 


Reply via email to