http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
index 5868ba6..871a08b 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPGetProcessor.java
@@ -18,10 +18,20 @@
 
 package org.apache.streams.components.http.processor;
 
+import org.apache.streams.components.http.HttpProcessorConfiguration;
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.extensions.ExtensionUtil;
+import org.apache.streams.pojo.json.ActivityObject;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Strings;
+
 import org.apache.commons.codec.binary.Base64;
 import org.apache.http.HttpEntity;
 import org.apache.http.client.methods.CloseableHttpResponse;
@@ -30,14 +40,6 @@ import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
 import org.apache.http.util.EntityUtils;
-import org.apache.streams.components.http.HttpProcessorConfiguration;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.extensions.ExtensionUtil;
-import org.apache.streams.pojo.json.ActivityObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,230 +52,249 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Processor retrieves contents from an known url and stores the resulting 
object in an extension field
+ * Processor retrieves contents from an known url and stores the resulting 
object in an extension field.
  */
 public class SimpleHTTPGetProcessor implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "SimpleHTTPGetProcessor";
+  private static final String STREAMS_ID = "SimpleHTTPGetProcessor";
 
-    // from root config id
-    private final static String EXTENSION = "account_type";
+  // from root config id
+  private static final String EXTENSION = "account_type";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPGetProcessor.class);
 
-    protected ObjectMapper mapper;
+  protected ObjectMapper mapper;
 
-    protected URIBuilder uriBuilder;
+  protected URIBuilder uriBuilder;
 
-    protected CloseableHttpClient httpclient;
+  protected CloseableHttpClient httpclient;
 
-    protected HttpProcessorConfiguration configuration;
+  protected HttpProcessorConfiguration configuration;
 
-    protected String authHeader;
-    public SimpleHTTPGetProcessor() {
-        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
-    }
+  protected String authHeader;
 
-    public SimpleHTTPGetProcessor(HttpProcessorConfiguration 
processorConfiguration) {
-        LOGGER.info("creating SimpleHTTPGetProcessor");
-        LOGGER.info(processorConfiguration.toString());
-        this.configuration = processorConfiguration;
-    }
+  /**
+   * SimpleHTTPGetProcessor constructor - resolves HttpProcessorConfiguration 
from JVM 'http'.
+   */
+  public SimpleHTTPGetProcessor() {
+    this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  /**
+   * SimpleHTTPGetProcessor constructor - uses provided 
HttpProcessorConfiguration.
+   */
+  public SimpleHTTPGetProcessor(HttpProcessorConfiguration 
processorConfiguration) {
+    LOGGER.info("creating SimpleHTTPGetProcessor");
+    LOGGER.info(processorConfiguration.toString());
+    this.configuration = processorConfiguration;
+  }
 
-    /**
-     Override this to store a result other than exact json representation of 
response
-     */
-    protected ObjectNode prepareExtensionFragment(String entityString) {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        try {
-            return mapper.readValue(entityString, ObjectNode.class);
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
-    }
-
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode getRootDocument(StreamsDatum datum) {
-
-        try {
-            String json = datum.getDocument() instanceof String ?
-                    (String) datum.getDocument() :
-                    mapper.writeValueAsString(datum.getDocument());
-            return mapper.readValue(json, ObjectNode.class);
-        } catch (JsonProcessingException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        } catch (IOException e) {
-            LOGGER.warn(e.getMessage());
-            return null;
-        }
+  /**
+   Override this to store a result other than exact json representation of 
response.
+   */
+  protected ObjectNode prepareExtensionFragment(String entityString) {
 
+    try {
+      return mapper.readValue(entityString, ObjectNode.class);
+    } catch (IOException ex) {
+      LOGGER.warn(ex.getMessage());
+      return null;
+    }
+  }
+
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+    try {
+      String json = datum.getDocument() instanceof String
+          ? (String) datum.getDocument()
+          : mapper.writeValueAsString(datum.getDocument());
+      return mapper.readValue(json, ObjectNode.class);
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn(ex.getMessage());
+      return null;
+    } catch (IOException ex) {
+      LOGGER.warn(ex.getMessage());
+      return null;
     }
 
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+  }
 
-        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return mapper.convertValue(rootDocument, ActivityObject.class);
-        else
-            return 
mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()),
 ActivityObject.class);
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
 
+    if ( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
 {
+      return mapper.convertValue(rootDocument, ActivityObject.class);
+    } else {
+      return 
mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()),
 ActivityObject.class);
     }
+  }
 
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode setEntityToExtend(ObjectNode rootDocument, 
ActivityObject activityObject) {
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ObjectNode setEntityToExtend(ObjectNode rootDocument, 
ActivityObject activityObject) {
 
-        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return mapper.convertValue(activityObject, ObjectNode.class);
-        else
-            rootDocument.set(this.configuration.getEntity().toString(), 
mapper.convertValue(activityObject, ObjectNode.class));
+    if ( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
 {
+      return mapper.convertValue(activityObject, ObjectNode.class);
+    } else {
+      rootDocument.set(this.configuration.getEntity().toString(), 
mapper.convertValue(activityObject, ObjectNode.class));
+    }
 
-        return rootDocument;
+    return rootDocument;
 
-    }
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        List<StreamsDatum> result = new ArrayList<>();
+    List<StreamsDatum> result = new ArrayList<>();
 
-        ObjectNode rootDocument = getRootDocument(entry);
+    ObjectNode rootDocument = getRootDocument(entry);
 
-        Map<String, String> params = prepareParams(entry);
+    Map<String, String> params = prepareParams(entry);
 
-        URI uri = prepareURI(params);
+    URI uri = prepareURI(params);
 
-        HttpGet httpget = prepareHttpGet(uri);
+    HttpGet httpget = prepareHttpGet(uri);
 
-        CloseableHttpResponse response = null;
+    CloseableHttpResponse response = null;
 
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpget);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
-                entityString = EntityUtils.toString(entity);
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
-            return result;
-        } finally {
-            try {
-                if (response != null) {
-                    response.close();
-                }
-            } catch (IOException ignored) {}
+    String entityString = null;
+    try {
+      response = httpclient.execute(httpget);
+      HttpEntity entity = response.getEntity();
+      // TODO: handle retry
+      if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
ex.getMessage());
+      return result;
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
         }
+      } catch (IOException ignored) {
+        LOGGER.trace("IOException", ignored);
+      }
+    }
 
-        if( entityString == null )
-            return result;
-
-        LOGGER.debug(entityString);
+    if( entityString == null ) {
+      return result;
+    }
 
-        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+    LOGGER.debug(entityString);
 
-        ActivityObject extensionEntity = getEntityToExtend(rootDocument);
+    ObjectNode extensionFragment = prepareExtensionFragment(entityString);
 
-        ExtensionUtil.getInstance().addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
+    ActivityObject extensionEntity = getEntityToExtend(rootDocument);
 
-        rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+    ExtensionUtil.getInstance().addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
 
-        entry.setDocument(rootDocument);
+    rootDocument = setEntityToExtend(rootDocument, extensionEntity);
 
-        result.add(entry);
+    entry.setDocument(rootDocument);
 
-        return result;
+    result.add(entry);
 
-    }
+    return result;
 
-    /**
-     Override this to alter request URI
-     */
-    protected URI prepareURI(Map<String, String> params) {
+  }
 
-        URI uri = null;
-        for( Map.Entry<String,String> param : params.entrySet()) {
-            uriBuilder = uriBuilder.setParameter(param.getKey(), 
param.getValue());
-        }
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            LOGGER.error("URI error {}", uriBuilder.toString());
-        }
-        return uri;
-    }
+  /**
+   Override this to alter request URI.
+   */
+  protected URI prepareURI(Map<String, String> params) {
 
-    /**
-     Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        return new HashMap<>();
+    URI uri = null;
+    for ( Map.Entry<String,String> param : params.entrySet()) {
+      uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
     }
-
-    /**
-     Override this to set a payload on the request
-     */
-    protected ObjectNode preparePayload(StreamsDatum entry) {
-        return null;
+    try {
+      uri = uriBuilder.build();
+    } catch (URISyntaxException ex) {
+      LOGGER.error("URI error {}", uriBuilder.toString());
     }
-
-    public HttpGet prepareHttpGet(URI uri) {
-        HttpGet httpget = new HttpGet(uri);
-        httpget.addHeader("content-type", this.configuration.getContentType());
-        if( !Strings.isNullOrEmpty(authHeader))
-            httpget.addHeader("Authorization", String.format("Basic %s", 
authHeader));
-        return httpget;
+    return uri;
+  }
+
+  /**
+   Override this to add parameters to the request.
+   */
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    return new HashMap<>();
+  }
+
+  /**
+   Override this to set a payload on the request.
+   */
+  protected ObjectNode preparePayload(StreamsDatum entry) {
+    return null;
+  }
+
+  /**
+   * Override this to set the URI for the request or modify headers.
+   * @param uri uri
+   * @return result
+   */
+  public HttpGet prepareHttpGet(URI uri) {
+    HttpGet httpget = new HttpGet(uri);
+    httpget.addHeader("content-type", this.configuration.getContentType());
+    if ( !Strings.isNullOrEmpty(authHeader)) {
+      httpget.addHeader("Authorization", String.format("Basic %s", 
authHeader));
     }
+    return httpget;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        mapper = StreamsJacksonMapper.getInstance();
+    mapper = StreamsJacksonMapper.getInstance();
 
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPath(this.configuration.getResourcePath());
+    uriBuilder = new URIBuilder()
+        .setScheme(this.configuration.getProtocol())
+        .setHost(this.configuration.getHostname())
+        .setPath(this.configuration.getResourcePath());
 
-        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
-            uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
-        if( !Strings.isNullOrEmpty(configuration.getUsername())
-            && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            String string = configuration.getUsername() + ":" + 
configuration.getPassword();
-            authHeader = Base64.encodeBase64String(string.getBytes());
-        }
-        httpclient = HttpClients.createDefault();
+    if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) {
+      uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
     }
-
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down SimpleHTTPGetProcessor");
-        try {
-            httpclient.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            try {
-                httpclient.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            } finally {
-                httpclient = null;
-            }
-        }
+    if ( !Strings.isNullOrEmpty(configuration.getUsername())
+         &&
+         !Strings.isNullOrEmpty(configuration.getPassword())) {
+      String string = configuration.getUsername() + ":" + 
configuration.getPassword();
+      authHeader = Base64.encodeBase64String(string.getBytes());
+    }
+    httpclient = HttpClients.createDefault();
+  }
+
+  @Override
+  public void cleanUp() {
+    LOGGER.info("shutting down SimpleHTTPGetProcessor");
+    try {
+      httpclient.close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    } finally {
+      try {
+        httpclient.close();
+      } catch (IOException e2) {
+        e2.printStackTrace();
+      } finally {
+        httpclient = null;
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
index f6089f6..1d52b5c 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/processor/SimpleHTTPPostProcessor.java
@@ -52,225 +52,241 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Processor retrieves contents from an known url and stores the resulting 
object in an extension field
+ * Processor retrieves contents from an known url and stores the resulting 
object in an extension field.
  */
 public class SimpleHTTPPostProcessor implements StreamsProcessor {
 
-    private final static String STREAMS_ID = "SimpleHTTPPostProcessor";
+  private static final String STREAMS_ID = "SimpleHTTPPostProcessor";
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPPostProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleHTTPPostProcessor.class);
 
-    protected ObjectMapper mapper;
+  protected ObjectMapper mapper;
 
-    protected URIBuilder uriBuilder;
+  protected URIBuilder uriBuilder;
 
-    protected CloseableHttpClient httpclient;
+  protected CloseableHttpClient httpclient;
 
-    protected HttpProcessorConfiguration configuration;
+  protected HttpProcessorConfiguration configuration;
 
-    protected String authHeader;
+  protected String authHeader;
 
-    public SimpleHTTPPostProcessor() {
-        this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
-    }
+  /**
+   * SimpleHTTPPostProcessor constructor - resolves HttpProcessorConfiguration 
from JVM 'http'.
+   */
+  public SimpleHTTPPostProcessor() {
+    this(new ComponentConfigurator<>(HttpProcessorConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
+  }
 
-    public SimpleHTTPPostProcessor(HttpProcessorConfiguration 
processorConfiguration) {
-        LOGGER.info("creating SimpleHTTPPostProcessor");
-        LOGGER.info(processorConfiguration.toString());
-        this.configuration = processorConfiguration;
-    }
+  /**
+   * SimpleHTTPPostProcessor constructor - uses provided 
HttpProcessorConfiguration.
+   */
+  public SimpleHTTPPostProcessor(HttpProcessorConfiguration 
processorConfiguration) {
+    LOGGER.info("creating SimpleHTTPPostProcessor");
+    LOGGER.info(processorConfiguration.toString());
+    this.configuration = processorConfiguration;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    /**
-     Override this to store a result other than exact json representation of 
response
-     */
-    protected ObjectNode prepareExtensionFragment(String entityString) {
+  /**
+   Override this to store a result other than exact json representation of 
response.
+   */
+  protected ObjectNode prepareExtensionFragment(String entityString) {
 
-        try {
-            return mapper.readValue(entityString, ObjectNode.class);
-        } catch (IOException e) {
-            LOGGER.warn("IOException", e);
-            return null;
-        }
+    try {
+      return mapper.readValue(entityString, ObjectNode.class);
+    } catch (IOException ex) {
+      LOGGER.warn("IOException", ex);
+      return null;
     }
-
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode getRootDocument(StreamsDatum datum) {
-
-        try {
-            String json = datum.getDocument() instanceof String ?
-                    (String) datum.getDocument() :
-                    mapper.writeValueAsString(datum.getDocument());
-            return mapper.readValue(json, ObjectNode.class);
-        } catch (JsonProcessingException e) {
-            LOGGER.warn("JsonProcessingException", e);
-            return null;
-        } catch (IOException e) {
-            LOGGER.warn("IOException", e);
-            return null;
-        }
-
+  }
+
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ObjectNode getRootDocument(StreamsDatum datum) {
+
+    try {
+      String json = datum.getDocument() instanceof String
+          ? (String) datum.getDocument()
+          : mapper.writeValueAsString(datum.getDocument());
+      return mapper.readValue(json, ObjectNode.class);
+    } catch (JsonProcessingException ex) {
+      LOGGER.warn("JsonProcessingException", ex);
+      return null;
+    } catch (IOException ex) {
+      LOGGER.warn("IOException", ex);
+      return null;
     }
 
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
+  }
 
-        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return mapper.convertValue(rootDocument, ActivityObject.class);
-        else
-            return 
mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()),
 ActivityObject.class);
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ActivityObject getEntityToExtend(ObjectNode rootDocument) {
 
+    if ( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
 {
+      return mapper.convertValue(rootDocument, ActivityObject.class);
+    } else {
+      return 
mapper.convertValue(rootDocument.get(this.configuration.getEntity().toString()),
 ActivityObject.class);
     }
+  }
 
-    /**
-     Override this to place result in non-standard location on document
-     */
-    protected ObjectNode setEntityToExtend(ObjectNode rootDocument, 
ActivityObject activityObject) {
-
-        if( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
-            return mapper.convertValue(activityObject, ObjectNode.class);
-        else
-            rootDocument.set(this.configuration.getEntity().toString(), 
mapper.convertValue(activityObject, ObjectNode.class));
-
-        return rootDocument;
+  /**
+   Override this to place result in non-standard location on document.
+   */
+  protected ObjectNode setEntityToExtend(ObjectNode rootDocument, 
ActivityObject activityObject) {
 
+    if ( 
this.configuration.getEntity().equals(HttpProcessorConfiguration.Entity.ACTIVITY))
 {
+      return mapper.convertValue(activityObject, ObjectNode.class);
+    } else {
+      rootDocument.set(this.configuration.getEntity().toString(), 
mapper.convertValue(activityObject, ObjectNode.class));
     }
+    return rootDocument;
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
+  }
 
-        List<StreamsDatum> result = new ArrayList<>();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
 
-        ObjectNode rootDocument = getRootDocument(entry);
+    List<StreamsDatum> result = new ArrayList<>();
 
-        Map<String, String> params = prepareParams(entry);
+    ObjectNode rootDocument = getRootDocument(entry);
 
-        URI uri;
-        for( Map.Entry<String,String> param : params.entrySet()) {
-            uriBuilder = uriBuilder.setParameter(param.getKey(), 
param.getValue());
-        }
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            LOGGER.error("URI error {}", uriBuilder.toString(), e);
-            return result;
-        }
+    Map<String, String> params = prepareParams(entry);
 
-        HttpEntity payload = preparePayload(entry);
-
-        HttpPost httpPost = prepareHttpPost(uri, payload);
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpPost);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
-                entityString = EntityUtils.toString(entity);
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, e);
-            return result;
-        } finally {
-            try {
-                if (response != null) {
-                    response.close();
-                }
-            } catch (IOException ignored) {}
-        }
+    URI uri;
+    for ( Map.Entry<String,String> param : params.entrySet() ) {
+      uriBuilder = uriBuilder.setParameter(param.getKey(), param.getValue());
+    }
+    try {
+      uri = uriBuilder.build();
+    } catch (URISyntaxException ex) {
+      LOGGER.error("URI error {}", uriBuilder.toString(), ex);
+      return result;
+    }
 
-        if( entityString == null )
-            return result;
+    HttpEntity payload = preparePayload(entry);
+
+    HttpPost httpPost = prepareHttpPost(uri, payload);
+
+    CloseableHttpResponse response = null;
+
+    String entityString = null;
+    try {
+      response = httpclient.execute(httpPost);
+      HttpEntity entity = response.getEntity();
+      // TODO: handle retry
+      if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, ex);
+      return result;
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
+        }
+      } catch (IOException ignored) {
+        LOGGER.trace("IOException", ignored);
+      }
+    }
 
-        LOGGER.debug(entityString);
+    if ( entityString == null ) {
+      return result;
+    }
 
-        ObjectNode extensionFragment = prepareExtensionFragment(entityString);
+    LOGGER.debug(entityString);
 
-        ActivityObject extensionEntity = getEntityToExtend(rootDocument);
+    ObjectNode extensionFragment = prepareExtensionFragment(entityString);
 
-        ExtensionUtil.getInstance().addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
+    ActivityObject extensionEntity = getEntityToExtend(rootDocument);
 
-        rootDocument = setEntityToExtend(rootDocument, extensionEntity);
+    ExtensionUtil.getInstance().addExtension(extensionEntity, 
this.configuration.getExtension(), extensionFragment);
 
-        entry.setDocument(rootDocument);
+    rootDocument = setEntityToExtend(rootDocument, extensionEntity);
 
-        result.add(entry);
+    entry.setDocument(rootDocument);
 
-        return result;
+    result.add(entry);
 
-    }
+    return result;
 
-    /**
-     Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        return new HashMap<>();
-    }
+  }
 
-    /**
-     Override this to add parameters to the request
-     */
-    protected HttpEntity preparePayload(StreamsDatum entry) {
-        return new StringEntity("{}",
-                ContentType.create("application/json"));
-    }
+  /**
+   Override this to add parameters to the request.
+   */
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    return new HashMap<>();
+  }
 
+  /**
+   Override this to add parameters to the request.
+   */
+  protected HttpEntity preparePayload(StreamsDatum entry) {
+    return new StringEntity("{}",
+        ContentType.create("application/json"));
+  }
 
-    public HttpPost prepareHttpPost(URI uri, HttpEntity entity) {
-        HttpPost httpPost = new HttpPost(uri);
-        httpPost.addHeader("content-type", 
this.configuration.getContentType());
-        if( !Strings.isNullOrEmpty(authHeader))
-            httpPost.addHeader("Authorization", String.format("Basic %s", 
authHeader));
-        httpPost.setEntity(entity);
-        return httpPost;
+  /**
+   * Override this to set the URI / entity for the request or modify headers.
+   * @param uri uri
+   * @param entity entity
+   * @return result
+   */
+  public HttpPost prepareHttpPost(URI uri, HttpEntity entity) {
+    HttpPost httpPost = new HttpPost(uri);
+    httpPost.addHeader("content-type", this.configuration.getContentType());
+    if ( !Strings.isNullOrEmpty(authHeader)) {
+      httpPost.addHeader("Authorization", String.format("Basic %s", 
authHeader));
     }
+    httpPost.setEntity(entity);
+    return httpPost;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        mapper = StreamsJacksonMapper.getInstance();
+    mapper = StreamsJacksonMapper.getInstance();
 
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPath(this.configuration.getResourcePath());
+    uriBuilder = new URIBuilder()
+        .setScheme(this.configuration.getProtocol())
+        .setHost(this.configuration.getHostname())
+        .setPath(this.configuration.getResourcePath());
 
-        if( !Strings.isNullOrEmpty(configuration.getAccessToken()) )
-            uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
-        if( !Strings.isNullOrEmpty(configuration.getUsername())
-            && !Strings.isNullOrEmpty(configuration.getPassword())) {
-            String string = configuration.getUsername() + ":" + 
configuration.getPassword();
-            authHeader = Base64.encodeBase64String(string.getBytes());
-        }
-        httpclient = HttpClients.createDefault();
+    if ( !Strings.isNullOrEmpty(configuration.getAccessToken()) ) {
+      uriBuilder = uriBuilder.addParameter("access_token", 
configuration.getAccessToken());
     }
-
-    @Override
-    public void cleanUp() {
-        LOGGER.info("shutting down SimpleHTTPPostProcessor");
-        try {
-            httpclient.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            try {
-                httpclient.close();
-            } catch (IOException e) {
-                LOGGER.error("IOException", e);
-            } finally {
-                httpclient = null;
-            }
-        }
+    if ( !Strings.isNullOrEmpty(configuration.getUsername())
+        && !Strings.isNullOrEmpty(configuration.getPassword())) {
+      String string = configuration.getUsername() + ":" + 
configuration.getPassword();
+      authHeader = Base64.encodeBase64String(string.getBytes());
+    }
+    httpclient = HttpClients.createDefault();
+  }
+
+  @Override
+  public void cleanUp() {
+    LOGGER.info("shutting down SimpleHTTPPostProcessor");
+    try {
+      httpclient.close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    } finally {
+      try {
+        httpclient.close();
+      } catch (IOException e2) {
+        LOGGER.error("IOException", e2);
+      } finally {
+        httpclient = null;
+      }
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
index 2078647..ab11a68 100644
--- 
a/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
+++ 
b/streams-components/streams-http/src/main/java/org/apache/streams/components/http/provider/SimpleHttpProvider.java
@@ -69,269 +69,287 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
- * Provider retrieves contents from an known set of urls and passes all 
resulting objects downstream
+ * Provider retrieves contents from an known set of urls and passes all 
resulting objects downstream.
  */
 public class SimpleHttpProvider implements StreamsProvider {
 
-    private final static String STREAMS_ID = "SimpleHttpProvider";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(SimpleHttpProvider.class);
-
-    protected ObjectMapper mapper;
-
-    protected URIBuilder uriBuilder;
-
-    protected CloseableHttpClient httpclient;
+  private static final String STREAMS_ID = "SimpleHttpProvider";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SimpleHttpProvider.class);
+
+  protected ObjectMapper mapper;
+
+  protected URIBuilder uriBuilder;
+
+  protected CloseableHttpClient httpclient;
+
+  protected HttpProviderConfiguration configuration;
+
+  protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
+
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+  private ExecutorService executor;
+
+  /**
+   * SimpleHttpProvider constructor - resolves HttpProcessorConfiguration from 
JVM 'http'.
+   */
+  public SimpleHttpProvider() {
+    this(new ComponentConfigurator<>(HttpProviderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
+  }
+
+  /**
+   * SimpleHttpProvider constructor - uses provided HttpProviderConfiguration.
+   */
+  public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) {
+    LOGGER.info("creating SimpleHttpProvider");
+    LOGGER.info(providerConfiguration.toString());
+    this.configuration = providerConfiguration;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  /**
+   Override this to add parameters to the request.
+   */
+  protected Map<String, String> prepareParams(StreamsDatum entry) {
+    return new HashMap<>();
+  }
+
+  /**
+   * prepareHttpRequest
+   * @param uri uri
+   * @return result
+   */
+  public HttpRequestBase prepareHttpRequest(URI uri) {
+    HttpRequestBase request;
+    if ( 
configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.GET))
 {
+      request = new HttpGet(uri);
+    } else if ( 
configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.POST))
 {
+      request = new HttpPost(uri);
+    } else {
+      // this shouldn't happen because of the default
+      request = new HttpGet(uri);
+    }
 
-    protected HttpProviderConfiguration configuration;
+    request.addHeader("content-type", this.configuration.getContentType());
 
-    protected volatile Queue<StreamsDatum> providerQueue = new 
ConcurrentLinkedQueue<>();
+    return request;
 
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  }
 
-    private ExecutorService executor;
+  @Override
+  public void prepare(Object configurationObject) {
 
-    public SimpleHttpProvider() {
-        this(new ComponentConfigurator<>(HttpProviderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("http")));
-    }
+    mapper = StreamsJacksonMapper.getInstance();
 
-    public SimpleHttpProvider(HttpProviderConfiguration providerConfiguration) 
{
-        LOGGER.info("creating SimpleHttpProvider");
-        LOGGER.info(providerConfiguration.toString());
-        this.configuration = providerConfiguration;
-    }
+    uriBuilder = new URIBuilder()
+        .setScheme(this.configuration.getProtocol())
+        .setHost(this.configuration.getHostname())
+        .setPort(this.configuration.getPort().intValue())
+        .setPath(this.configuration.getResourcePath());
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+    SSLContextBuilder builder = new SSLContextBuilder();
+    SSLConnectionSocketFactory sslsf = null;
+    try {
+      builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
+      sslsf = new SSLConnectionSocketFactory(
+          builder.build(), 
SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
+    } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException ex) {
+      LOGGER.warn(ex.getMessage());
     }
 
-    /**
-      Override this to add parameters to the request
-     */
-    protected Map<String, String> prepareParams(StreamsDatum entry) {
-        return new HashMap<>();
+    httpclient = HttpClients.custom().setSSLSocketFactory(
+        sslsf).build();
+
+    executor = Executors.newSingleThreadExecutor();
+
+  }
+
+  @Override
+  public void cleanUp() {
+
+    LOGGER.info("shutting down SimpleHttpProvider");
+    this.shutdownAndAwaitTermination(executor);
+    try {
+      httpclient.close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    } finally {
+      try {
+        httpclient.close();
+      } catch (IOException ex) {
+        ex.printStackTrace();
+      } finally {
+        httpclient = null;
+      }
     }
+  }
 
-    public HttpRequestBase prepareHttpRequest(URI uri) {
-        HttpRequestBase request;
-        if( 
configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.GET))
 {
-            request = new HttpGet(uri);
-        } else if( 
configuration.getRequestMethod().equals(HttpProviderConfiguration.RequestMethod.POST))
 {
-            request = new HttpPost(uri);
-        } else {
-            // this shouldn't happen because of the default
-            request = new HttpGet(uri);
-        }
-
-        request.addHeader("content-type", this.configuration.getContentType());
+  @Override
+  public void startStream() {
 
-        return request;
+    executor.execute(new Runnable() {
+      @Override
+      public void run() {
 
-    }
+        readCurrent();
 
-    @Override
-    public void prepare(Object configurationObject) {
-
-        mapper = StreamsJacksonMapper.getInstance();
-
-        uriBuilder = new URIBuilder()
-            .setScheme(this.configuration.getProtocol())
-            .setHost(this.configuration.getHostname())
-            .setPort(this.configuration.getPort().intValue())
-            .setPath(this.configuration.getResourcePath());
-
-        SSLContextBuilder builder = new SSLContextBuilder();
-        SSLConnectionSocketFactory sslsf = null;
-        try {
-            builder.loadTrustMaterial(null, new TrustSelfSignedStrategy());
-            sslsf = new SSLConnectionSocketFactory(
-                    builder.build(), 
SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER);
-        } catch (NoSuchAlgorithmException | KeyManagementException | 
KeyStoreException e) {
-            LOGGER.warn(e.getMessage());
-        }
+        Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
 
-        httpclient = HttpClients.custom().setSSLSocketFactory(
-                sslsf).build();
+      }
+    });
+  }
 
-        executor = Executors.newSingleThreadExecutor();
+  @Override
+  public StreamsResultSet readCurrent() {
+    StreamsResultSet current;
 
-    }
+    uriBuilder = uriBuilder.setPath(
+        Joiner.on("/").skipNulls().join(uriBuilder.getPath(), 
configuration.getResource(), configuration.getResourcePostfix())
+    );
 
-    @Override
-    public void cleanUp() {
-
-        LOGGER.info("shutting down SimpleHttpProvider");
-        this.shutdownAndAwaitTermination(executor);
-        try {
-            httpclient.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            try {
-                httpclient.close();
-            } catch (IOException e) {
-                e.printStackTrace();
-            } finally {
-                httpclient = null;
-            }
-        }
+    URI uri;
+    try {
+      uri = uriBuilder.build();
+    } catch (URISyntaxException ex) {
+      uri = null;
     }
 
-    @Override
-    public void startStream() {
-
-        executor.execute(new Runnable() {
-            @Override
-            public void run() {
+    List<ObjectNode> results = execute(uri);
 
-                readCurrent();
+    lock.writeLock().lock();
 
-                Uninterruptibles.sleepUninterruptibly(5, TimeUnit.SECONDS);
-
-            }
-        });
+    for ( ObjectNode item : results ) {
+      providerQueue.add(newDatum(item));
     }
 
-    @Override
-    public StreamsResultSet readCurrent() {
-        StreamsResultSet current;
-
-        uriBuilder = uriBuilder.setPath(
-                Joiner.on("/").skipNulls().join(uriBuilder.getPath(), 
configuration.getResource(), configuration.getResourcePostfix())
-        );
+    LOGGER.debug("Creating new result set for {} items", providerQueue.size());
+    current = new StreamsResultSet(providerQueue);
 
-        URI uri;
-        try {
-            uri = uriBuilder.build();
-        } catch (URISyntaxException e) {
-            uri = null;
-        }
-
-        List<ObjectNode> results = execute(uri);
-
-        lock.writeLock().lock();
-
-        for( ObjectNode item : results ) {
-            providerQueue.add(newDatum(item));
-        }
-
-        LOGGER.debug("Creating new result set for {} items", 
providerQueue.size());
-        current = new StreamsResultSet(providerQueue);
+    return current;
+  }
 
-        return current;
-    }
+  protected List<ObjectNode> execute(URI uri) {
 
-    protected List<ObjectNode> execute(URI uri) {
-
-        Preconditions.checkNotNull(uri);
-
-        List<ObjectNode> results = new ArrayList<>();
-
-        HttpRequestBase httpRequest = prepareHttpRequest(uri);
-
-        CloseableHttpResponse response = null;
-
-        String entityString;
-        try {
-            response = httpclient.execute(httpRequest);
-            HttpEntity entity = response.getEntity();
-            // TODO: handle retry
-            if (response.getStatusLine().getStatusCode() == 200 && entity != 
null) {
-                entityString = EntityUtils.toString(entity);
-                if( !entityString.equals("{}") && !entityString.equals("[]") ) 
{
-                    JsonNode jsonNode = mapper.readValue(entityString, 
JsonNode.class);
-                    results = parse(jsonNode);
-                }
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
e.getMessage());
-        } finally {
-            try {
-                if (response != null) {
-                    response.close();
-                }
-            } catch (IOException ignored) {}
-        }
-        return results;
-    }
+    Preconditions.checkNotNull(uri);
 
-    /**
-     Override this to change how entity gets converted to objects
-     */
-    protected List<ObjectNode> parse(JsonNode jsonNode) {
+    List<ObjectNode> results = new ArrayList<>();
 
-        List<ObjectNode> results = new ArrayList<>();
+    HttpRequestBase httpRequest = prepareHttpRequest(uri);
 
-        if (jsonNode != null && jsonNode instanceof ObjectNode ) {
-            results.add((ObjectNode) jsonNode);
-        } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
-            ArrayNode arrayNode = (ArrayNode) jsonNode;
-            Iterator<JsonNode> iterator = arrayNode.elements();
-            while (iterator.hasNext()) {
-                ObjectNode element = (ObjectNode) iterator.next();
+    CloseableHttpResponse response = null;
 
-                results.add(element);
-            }
+    String entityString;
+    try {
+      response = httpclient.execute(httpRequest);
+      HttpEntity entity = response.getEntity();
+      // TODO: handle retry
+      if (response.getStatusLine().getStatusCode() == 200 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+        if ( !entityString.equals("{}") && !entityString.equals("[]") ) {
+          JsonNode jsonNode = mapper.readValue(entityString, JsonNode.class);
+          results = parse(jsonNode);
         }
-
-        return results;
-    }
-
-    /**
-     Override this to change how metadata is derived from object
-     */
-    protected StreamsDatum newDatum(ObjectNode item) {
-        try {
-            String id = null;
-            if( item.get("id") != null )
-                id = item.get("id").asText();
-            DateTime timestamp = null;
-            if( item.get("timestamp") != null )
-                timestamp = new DateTime(item.get("timestamp").asText());
-            if( id != null && timestamp != null )
-                return new StreamsDatum(item, id, timestamp);
-            else if( id != null )
-                return new StreamsDatum(item, id);
-            else if( timestamp != null )
-                return new StreamsDatum(item, null, timestamp);
-            else return new StreamsDatum(item);
-        } catch( Exception e ) {
-            return new StreamsDatum(item);
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", uri.toString(), response, 
ex.getMessage());
+    } finally {
+      try {
+        if (response != null) {
+          response.close();
         }
+      } catch (IOException ignored) {
+        LOGGER.trace("IOException", ignored);
+      }
     }
-
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
-    }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
+    return results;
+  }
+
+  /**
+   Override this to change how entity gets converted to objects.
+   */
+  protected List<ObjectNode> parse(JsonNode jsonNode) {
+
+    List<ObjectNode> results = new ArrayList<>();
+
+    if (jsonNode != null && jsonNode instanceof ObjectNode ) {
+      results.add((ObjectNode) jsonNode);
+    } else if (jsonNode != null && jsonNode instanceof ArrayNode) {
+      ArrayNode arrayNode = (ArrayNode) jsonNode;
+      Iterator<JsonNode> iterator = arrayNode.elements();
+      while (iterator.hasNext()) {
+        ObjectNode element = (ObjectNode) iterator.next();
+
+        results.add(element);
+      }
     }
 
-    @Override
-    public boolean isRunning() {
-        return true;
+    return results;
+  }
+
+  /**
+   Override this to change how metadata is derived from object.
+   */
+  protected StreamsDatum newDatum(ObjectNode item) {
+    try {
+      String id = null;
+      if ( item.get("id") != null ) {
+        id = item.get("id").asText();
+      }
+      DateTime timestamp = null;
+      if ( item.get("timestamp") != null ) {
+        timestamp = new DateTime(item.get("timestamp").asText());
+      }
+      if ( id != null && timestamp != null ) {
+        return new StreamsDatum(item, id, timestamp);
+      } else if ( id != null ) {
+        return new StreamsDatum(item, id);
+      } else if ( timestamp != null ) {
+        return new StreamsDatum(item, null, timestamp);
+      } else {
+        return new StreamsDatum(item);
+      }
+    } catch ( Exception ex ) {
+      return new StreamsDatum(item);
     }
-
-    protected void shutdownAndAwaitTermination(ExecutorService pool) {
-        pool.shutdown(); // Disable new tasks from being submitted
-        try {
-            // Wait a while for existing tasks to terminate
-            if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
-                pool.shutdownNow(); // Cancel currently executing tasks
-                // Wait a while for tasks to respond to being cancelled
-                if (!pool.awaitTermination(10, TimeUnit.SECONDS))
-                    LOGGER.error("Pool did not terminate");
-            }
-        } catch (InterruptedException ie) {
-            // (Re-)Cancel if current thread also interrupted
-            pool.shutdownNow();
-            // Preserve interrupt status
-            Thread.currentThread().interrupt();
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return true;
+  }
+
+  protected void shutdownAndAwaitTermination(ExecutorService pool) {
+    pool.shutdown(); // Disable new tasks from being submitted
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+        pool.shutdownNow(); // Cancel currently executing tasks
+        // Wait a while for tasks to respond to being cancelled
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          LOGGER.error("Pool did not terminate");
         }
+      }
+    } catch (InterruptedException ie) {
+      // (Re-)Cancel if current thread also interrupted
+      pool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
----------------------------------------------------------------------
diff --git 
a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
 
b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
index 55e338d..2333c4b 100644
--- 
a/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
+++ 
b/streams-components/streams-http/src/test/java/SimpleHTTPPostPersistWriterTest.java
@@ -16,16 +16,18 @@
  * under the License.
  */
 
+import org.apache.streams.components.http.HttpPersistWriterConfiguration;
+import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.http.client.methods.CloseableHttpResponse;
 import org.apache.http.client.methods.HttpUriRequest;
 import org.apache.http.impl.client.CloseableHttpClient;
 import org.apache.http.impl.client.HttpClients;
-import org.apache.streams.components.http.HttpPersistWriterConfiguration;
-import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -47,87 +49,91 @@ import static org.mockito.Matchers.any;
 @PrepareForTest({HttpClients.class, CloseableHttpResponse.class, 
CloseableHttpResponse.class})
 public class SimpleHTTPPostPersistWriterTest {
 
-    private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
-
-    /**
-     * test port.
-     */
-    private static final int PORT = 18080;
-
-    /**
-     * test hosts.
-     */
-    private static final String HOSTNAME = "localhost";
-
-    /**
-     * test protocol.
-     */
-    private static final String PROTOCOL = "http";
-
-    /**
-     * CloseableHttpClient mock.
-     */
-    private CloseableHttpClient client;
-
-    /**
-     * CloseableHttpClient mock.
-     */
-    private CloseableHttpResponse response = 
Mockito.mock(CloseableHttpResponse.class);
-
-    /**
-     * Our output.
-     */
-    private ByteArrayOutputStream output;
-
-    /**
-     * Our input.
-     */
-    private ByteArrayInputStream input;
-
-    @Before
-    public void setUp() throws Exception
-    {
-        /*
-      HttpClients mock.
+  private ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * test port.
+   */
+  private static final int PORT = 18080;
+
+  /**
+   * test hosts.
+   */
+  private static final String HOSTNAME = "localhost";
+
+  /**
+   * test protocol.
+   */
+  private static final String PROTOCOL = "http";
+
+  /**
+   * CloseableHttpClient mock.
+   */
+  private CloseableHttpClient client;
+
+  /**
+   * CloseableHttpClient mock.
+   */
+  private CloseableHttpResponse response = 
Mockito.mock(CloseableHttpResponse.class);
+
+  /**
+   * Our output.
+   */
+  private ByteArrayOutputStream output;
+
+  /**
+   * Our input.
+   */
+  private ByteArrayInputStream input;
+
+  @Before
+  public void setUp() throws Exception
+  {
+    /*
+     HttpClients mock.
      */
-        this.client = PowerMockito.mock(CloseableHttpClient.class);
+    this.client = PowerMockito.mock(CloseableHttpClient.class);
 
-        PowerMockito.mockStatic(HttpClients.class);
+    PowerMockito.mockStatic(HttpClients.class);
 
-        PowerMockito.when(HttpClients.createDefault())
-                .thenReturn(client);
+    PowerMockito.when(HttpClients.createDefault())
+        .thenReturn(client);
 
-        PowerMockito.when(client.execute(any(HttpUriRequest.class)))
-                .thenReturn(response);
+    PowerMockito.when(client.execute(any(HttpUriRequest.class)))
+        .thenReturn(response);
 
-        Mockito.when(response.getEntity()).thenReturn(null);
-        Mockito.doNothing().when(response).close();
+    Mockito.when(response.getEntity()).thenReturn(null);
+    Mockito.doNothing().when(response).close();
 
-    }
+  }
 
-    @Test
-    public void testPersist() throws Exception
-    {
-        HttpPersistWriterConfiguration configuration = new 
HttpPersistWriterConfiguration();
-        configuration.setProtocol(PROTOCOL);
-        configuration.setHostname(HOSTNAME);
-        configuration.setPort((long) PORT);
-        configuration.setResourcePath("/");
+  /**
+   * testPersist.
+   * @throws Exception
+   */
+  @Test
+  public void testPersist() throws Exception
+  {
+    HttpPersistWriterConfiguration configuration = new 
HttpPersistWriterConfiguration();
+    configuration.setProtocol(PROTOCOL);
+    configuration.setHostname(HOSTNAME);
+    configuration.setPort((long) PORT);
+    configuration.setResourcePath("/");
 
-        /*
-      Instance under tests.
+    /*
+     Instance under tests.
      */
-        SimpleHTTPPostPersistWriter writer = new 
SimpleHTTPPostPersistWriter(configuration);
+    SimpleHTTPPostPersistWriter writer = new 
SimpleHTTPPostPersistWriter(configuration);
 
-        writer.prepare(null);
+    writer.prepare(null);
 
-        StreamsDatum testDatum = new 
StreamsDatum(mapper.readValue("{\"message\":\"ping\"}", ObjectNode.class));
+    StreamsDatum testDatum = new 
StreamsDatum(mapper.readValue("{\"message\":\"ping\"}", ObjectNode.class));
 
-        writer.write(testDatum);
+    writer.write(testDatum);
 
-        Mockito.verify(this.client).execute(any(HttpUriRequest.class));
+    Mockito.verify(this.client).execute(any(HttpUriRequest.class));
 
-        Mockito.verify(this.response).close();
+    Mockito.verify(this.response).close();
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
 
b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
index 42b70a6..5eea60e 100644
--- 
a/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
+++ 
b/streams-config/src/main/java/org/apache/streams/config/ComponentConfigurator.java
@@ -19,9 +19,9 @@
 package org.apache.streams.config;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.reflect.TypeToken;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigRenderOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -30,44 +30,64 @@ import java.io.Serializable;
 /**
  * ComponentConfigurator supplies serializable configuration beans derived 
from a specified typesafe path or object.
  *
+ * <p/>
  * Typically a component will select a 'default' typesafe path to be used if 
no other path or object is provided.
  *
+ * <p/>
  * For example, streams-persist-elasticsearch will use 'elasticsearch' by 
default, but an implementation
- *   such as github.com/w2ogroup/elasticsearch-reindex can resolve a reader 
from elasticsearch.source
- *   and a writer from elasticsearch.destination
+ *   such as github.com/apache/streams-examples/local/elasticsearch-reindex
+ *   can resolve a reader from elasticsearch.source
+ *   and a writer from elasticsearch.destination.
  *
  */
 public class ComponentConfigurator<T extends Serializable> {
 
-    private Class<T> configClass;
-    public ComponentConfigurator(Class<T> configClass) {
-        this.configClass = configClass;
-    }
+  private Class<T> configClass;
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ComponentConfigurator.class);
+  public ComponentConfigurator(Class<T> configClass) {
+    this.configClass = configClass;
+  }
 
-    private final static ObjectMapper mapper = new ObjectMapper();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ComponentConfigurator.class);
 
-    public T detectConfiguration(Config typesafeConfig) {
+  private static final ObjectMapper mapper = new ObjectMapper();
 
-        T pojoConfig = null;
+  /**
+   * resolve a serializable configuration pojo from a given typesafe config 
object.
+   * @param typesafeConfig typesafeConfig
+   * @return result
+   */
+  public T detectConfiguration(Config typesafeConfig) {
 
-        try {
-            pojoConfig = 
mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), 
configClass);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse:", typesafeConfig);
-        }
+    T pojoConfig = null;
 
-        return pojoConfig;
+    try {
+      pojoConfig = 
mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), 
configClass);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      LOGGER.warn("Could not parse:", typesafeConfig);
     }
 
-    public T detectConfiguration(String subConfig) {
-        Config streamsConfig = StreamsConfigurator.getConfig();
-        return detectConfiguration( streamsConfig.getConfig(subConfig));
-    }
+    return pojoConfig;
+  }
 
-    public T detectConfiguration(Config typesafeConfig, String subConfig) {
-        return detectConfiguration( typesafeConfig.getConfig(subConfig));
-    }
+  /**
+   * resolve a serializable configuration pojo from a portion of the JVM 
config object.
+   * @param subConfig subConfig
+   * @return result
+   */
+  public T detectConfiguration(String subConfig) {
+    Config streamsConfig = StreamsConfigurator.getConfig();
+    return detectConfiguration( streamsConfig.getConfig(subConfig));
+  }
+
+  /**
+   * resolve a serializable configuration pojo from a portion of a given 
typesafe config object.
+   * @param typesafeConfig typesafeConfig
+   * @param subConfig subConfig
+   * @return result
+   */
+  public T detectConfiguration(Config typesafeConfig, String subConfig) {
+    return detectConfiguration( typesafeConfig.getConfig(subConfig));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
index 6a8fb1d..319b32a 100644
--- 
a/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
+++ 
b/streams-config/src/main/java/org/apache/streams/config/StreamsConfigurator.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 import com.typesafe.config.ConfigRenderOptions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,60 +37,60 @@ import java.net.URL;
  */
 public class StreamsConfigurator {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ComponentConfigurator.class);
-
-    private final static ObjectMapper mapper = new ObjectMapper();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ComponentConfigurator.class);
 
-    /*
-        Pull all configuration files from the classpath, system properties, 
and environment variables
-     */
-    public static Config config = ConfigFactory.load();
+  private static final ObjectMapper mapper = new ObjectMapper();
 
-    public static Config getConfig() {
-        return config;
-    }
+  /*
+      Pull all configuration files from the classpath, system properties, and 
environment variables
+   */
+  public static Config config = ConfigFactory.load();
 
-    public static Config resolveConfig(String configUrl) throws 
MalformedURLException {
-        URL url = new URL(configUrl);
-        Config urlConfig = ConfigFactory.parseURL(url);
-        urlConfig.resolve();
-        config = urlConfig;
-        return config;
-    }
+  public static Config getConfig() {
+    return config;
+  }
 
+  public static Config resolveConfig(String configUrl) throws 
MalformedURLException {
+    URL url = new URL(configUrl);
+    Config urlConfig = ConfigFactory.parseURL(url);
+    urlConfig.resolve();
+    config = urlConfig;
+    return config;
+  }
 
 
-    public static StreamsConfiguration detectConfiguration() {
-        return detectConfiguration(config);
-    }
 
-    public static StreamsConfiguration detectConfiguration(Config 
typesafeConfig) {
+  public static StreamsConfiguration detectConfiguration() {
+    return detectConfiguration(config);
+  }
 
-        StreamsConfiguration pojoConfig = null;
+  public static StreamsConfiguration detectConfiguration(Config 
typesafeConfig) {
 
-        try {
-            pojoConfig = 
mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), 
StreamsConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Could not parse:", typesafeConfig);
-        }
+    StreamsConfiguration pojoConfig = null;
 
-        return pojoConfig;
+    try {
+      pojoConfig = 
mapper.readValue(typesafeConfig.root().render(ConfigRenderOptions.concise()), 
StreamsConfiguration.class);
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOGGER.warn("Could not parse:", typesafeConfig);
     }
 
-    public static StreamsConfiguration mergeConfigurations(Config base, Config 
delta) {
+    return pojoConfig;
+  }
 
-        Config merged = delta.withFallback(base);
+  public static StreamsConfiguration mergeConfigurations(Config base, Config 
delta) {
 
-        StreamsConfiguration pojoConfig = null;
+    Config merged = delta.withFallback(base);
 
-        try {
-            pojoConfig = 
mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), 
StreamsConfiguration.class);
-        } catch (Exception e) {
-            e.printStackTrace();
-            LOGGER.warn("Failed to merge.");
-        }
+    StreamsConfiguration pojoConfig = null;
 
-        return pojoConfig;
+    try {
+      pojoConfig = 
mapper.readValue(merged.root().render(ConfigRenderOptions.concise()), 
StreamsConfiguration.class);
+    } catch (Exception e) {
+      e.printStackTrace();
+      LOGGER.warn("Failed to merge.");
     }
+
+    return pojoConfig;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
 
b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
index eddfb53..82cc6bc 100644
--- 
a/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
+++ 
b/streams-config/src/test/java/org/apache/streams/config/test/ComponentConfiguratorTest.java
@@ -40,7 +40,7 @@ import org.powermock.modules.junit4.PowerMockRunner;
 @PrepareForTest(StreamsConfigurator.class)
 public class ComponentConfiguratorTest {
 
-    private final static ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = new ObjectMapper();
 
     @Test
     public void testDetectDefaults() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
 
b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
index 65dbd75..a29d8c7 100644
--- 
a/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
+++ 
b/streams-config/src/test/java/org/apache/streams/config/test/StreamsConfiguratorTest.java
@@ -44,7 +44,7 @@ import java.util.Scanner;
  */
 public class StreamsConfiguratorTest {
 
-    private final static ObjectMapper mapper = new ObjectMapper();
+    private static final ObjectMapper mapper = new ObjectMapper();
 
     @Test
     public void testDetectConfiguration() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
index fc00321..e3bfe70 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReader.java
@@ -18,6 +18,12 @@
 
 package org.apache.streams.amazon.kinesis;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistReader;
+import org.apache.streams.core.StreamsResultSet;
+
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.Protocol;
 import com.amazonaws.auth.AWSCredentials;
@@ -27,150 +33,150 @@ import com.amazonaws.regions.Regions;
 import com.amazonaws.services.kinesis.AmazonKinesisClient;
 import com.amazonaws.services.kinesis.model.DescribeStreamResult;
 import com.amazonaws.services.kinesis.model.Shard;
-import com.amazonaws.services.s3.AmazonS3Client;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Strings;
 import com.google.common.collect.Queues;
 import com.typesafe.config.Config;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.DatumStatusCounter;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistReader;
-import org.apache.streams.core.StreamsResultSet;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.streams.amazon.kinesis.KinesisConfiguration;
-
 import java.io.Serializable;
 import java.math.BigInteger;
 import java.util.List;
-import java.util.Properties;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
+/**
+ * KinesisPersistReader reads documents from kinesis.
+ */
 public class KinesisPersistReader implements StreamsPersistReader, 
Serializable {
 
-    public final static String STREAMS_ID = "KinesisPersistReader";
+  public static final String STREAMS_ID = "KinesisPersistReader";
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisPersistReader.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisPersistReader.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper = new ObjectMapper();
+  private ObjectMapper mapper = new ObjectMapper();
 
-    private KinesisReaderConfiguration config;
+  private KinesisReaderConfiguration config;
 
-    protected Long pollInterval = 
StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+  protected Long pollInterval = 
StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
 
-    private List<String> streamNames;
+  private List<String> streamNames;
 
-    private ExecutorService executor;
+  private ExecutorService executor;
 
-    protected AmazonKinesisClient client;
+  protected AmazonKinesisClient client;
 
-    public KinesisPersistReader() {
-        Config config = StreamsConfigurator.config.getConfig("kinesis");
-        this.config = new 
ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  /**
+   * KinesisPersistReader constructor - resolves KinesisReaderConfiguration 
from JVM 'kinesis'.
+   */
+  public KinesisPersistReader() {
+    Config config = StreamsConfigurator.config.getConfig("kinesis");
+    this.config = new 
ComponentConfigurator<>(KinesisReaderConfiguration.class).detectConfiguration(config);
+    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    public KinesisPersistReader(KinesisReaderConfiguration config) {
-        this.config = config;
-        this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
-    }
+  /**
+   * KinesisPersistReader constructor - uses provided 
KinesisReaderConfiguration.
+   */
+  public KinesisPersistReader(KinesisReaderConfiguration config) {
+    this.config = config;
+    this.persistQueue  = new ConcurrentLinkedQueue<StreamsDatum>();
+  }
 
-    public void setConfig(KinesisReaderConfiguration config) {
-        this.config = config;
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  public void setConfig(KinesisReaderConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public void startStream() {
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-        this.streamNames = this.config.getStreams();
+  @Override
+  public void startStream() {
 
-        for (final String stream : streamNames) {
+    this.streamNames = this.config.getStreams();
 
-            DescribeStreamResult describeStreamResult = 
client.describeStream(stream);
+    for (final String stream : streamNames) {
 
-            if( 
"ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) 
{
+      DescribeStreamResult describeStreamResult = 
client.describeStream(stream);
 
-                List<Shard> shardList = 
describeStreamResult.getStreamDescription().getShards();
+      if( 
"ACTIVE".equals(describeStreamResult.getStreamDescription().getStreamStatus())) 
{
 
-                for( Shard shard : shardList ) {
-                    executor.submit(new KinesisPersistReaderTask(this, stream, 
shard.getShardId()));
-                }
-            }
+        List<Shard> shardList = 
describeStreamResult.getStreamDescription().getShards();
 
+        for( Shard shard : shardList ) {
+          executor.submit(new KinesisPersistReaderTask(this, stream, 
shard.getShardId()));
         }
+      }
 
     }
 
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
-    }
+  }
 
-    public StreamsResultSet readCurrent() {
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
 
-        StreamsResultSet current;
-        synchronized( KinesisPersistReader.class ) {
-            current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
-            persistQueue.clear();
-        }
-        return current;
-    }
+  public StreamsResultSet readCurrent() {
 
-    @Override
-    public StreamsResultSet readNew(BigInteger bigInteger) {
-        return null;
+    StreamsResultSet current;
+    synchronized( KinesisPersistReader.class ) {
+      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      persistQueue.clear();
     }
-
-    @Override
-    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        return null;
+    return current;
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger bigInteger) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    return !executor.isShutdown() && !executor.isTerminated();
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    // Connect to Kinesis
+    synchronized (this) {
+      // Create the credentials Object
+      AWSCredentials credentials = new BasicAWSCredentials(config.getKey(), 
config.getSecretKey());
+
+      ClientConfiguration clientConfig = new ClientConfiguration();
+      
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
+
+      this.client = new AmazonKinesisClient(credentials, clientConfig);
+      if (!Strings.isNullOrEmpty(config.getRegion()))
+        
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
     }
+    streamNames = this.config.getStreams();
+    executor = Executors.newFixedThreadPool(streamNames.size());
+  }
 
-    @Override
-    public boolean isRunning() {
-        return !executor.isShutdown() && !executor.isTerminated();
-    }
+  @Override
+  public void cleanUp() {
 
-    @Override
-    public void prepare(Object configurationObject) {
-        // Connect to Kinesis
-        synchronized (this) {
-            // Create the credentials Object
-            AWSCredentials credentials = new 
BasicAWSCredentials(config.getKey(), config.getSecretKey());
-
-            ClientConfiguration clientConfig = new ClientConfiguration();
-            
clientConfig.setProtocol(Protocol.valueOf(config.getProtocol().toString()));
-
-            this.client = new AmazonKinesisClient(credentials, clientConfig);
-            if (!Strings.isNullOrEmpty(config.getRegion()))
-                
this.client.setRegion(Region.getRegion(Regions.fromName(config.getRegion())));
-        }
-        streamNames = this.config.getStreams();
-        executor = Executors.newFixedThreadPool(streamNames.size());
-    }
-
-    @Override
-    public void cleanUp() {
-
-        while( !executor.isTerminated()) {
-            try {
-                executor.awaitTermination(5, TimeUnit.SECONDS);
-            } catch (InterruptedException e) {}
-        }
+    while( !executor.isTerminated()) {
+      try {
+        executor.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (InterruptedException e) {}
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
index 7753031..a93fda8 100644
--- 
a/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
+++ 
b/streams-contrib/streams-amazon-aws/streams-persist-kinesis/src/main/java/org/apache/streams/amazon/kinesis/KinesisPersistReaderTask.java
@@ -18,94 +18,102 @@
 
 package org.apache.streams.amazon.kinesis;
 
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+
 import com.amazonaws.services.kinesis.model.GetRecordsRequest;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
 import com.amazonaws.services.kinesis.model.GetShardIteratorRequest;
 import com.amazonaws.services.kinesis.model.GetShardIteratorResult;
 import com.amazonaws.services.kinesis.model.Record;
-import com.amazonaws.util.Base64;
 import com.google.common.collect.Maps;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.math.BigInteger;
 import java.nio.charset.Charset;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Random;
 
+/**
+ * KinesisPersistReaderTask reads documents from kinesis on behalf of
+ * @see {@link KinesisPersistReader}.
+ */
 public class KinesisPersistReaderTask implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisPersistReaderTask.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(KinesisPersistReaderTask.class);
 
-    private KinesisPersistReader reader;
-    private String streamName;
-    private String shardId;
+  private KinesisPersistReader reader;
+  private String streamName;
+  private String shardId;
 
-    private String shardIteratorId;
+  private String shardIteratorId;
 
-    private Long pollInterval = 
StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
+  private Long pollInterval = 
StreamsConfigurator.detectConfiguration().getBatchFrequencyMs();
 
-    public KinesisPersistReaderTask(KinesisPersistReader reader, String 
streamName, String shardId) {
-        this.reader = reader;
-        this.streamName = streamName;
-        this.shardId = shardId;
-    }
+  /**
+   * KinesisPersistReaderTask constructor.
+   */
+  public KinesisPersistReaderTask(KinesisPersistReader reader, String 
streamName, String shardId) {
+    this.reader = reader;
+    this.streamName = streamName;
+    this.shardId = shardId;
+  }
 
-    @Override
-    public void run() {
+  @Override
+  public void run() {
 
-        GetShardIteratorRequest shardIteratorRequest = new 
GetShardIteratorRequest()
-                .withStreamName(this.streamName)
-                .withShardId(shardId)
-                .withShardIteratorType("TRIM_HORIZON");
+    GetShardIteratorRequest shardIteratorRequest = new 
GetShardIteratorRequest()
+        .withStreamName(this.streamName)
+        .withShardId(shardId)
+        .withShardIteratorType("TRIM_HORIZON");
 
-        GetShardIteratorResult shardIteratorResult = 
reader.client.getShardIterator(shardIteratorRequest);
+    GetShardIteratorResult shardIteratorResult = 
reader.client.getShardIterator(shardIteratorRequest);
 
-        shardIteratorId = shardIteratorResult.getShardIterator();
+    shardIteratorId = shardIteratorResult.getShardIterator();
 
-        Map<String,Object> metadata = Maps.newHashMap();
-        metadata.put("streamName", streamName);
-        metadata.put("shardId", shardId);
+    Map<String,Object> metadata = Maps.newHashMap();
+    metadata.put("streamName", streamName);
+    metadata.put("shardId", shardId);
 
-        while(true) {
+    while (true) {
 
-            GetRecordsRequest recordsRequest = new GetRecordsRequest()
-                    .withShardIterator(shardIteratorId);
+      GetRecordsRequest recordsRequest = new GetRecordsRequest()
+          .withShardIterator(shardIteratorId);
 
-            GetRecordsResult recordsResult = 
reader.client.getRecords(recordsRequest);
+      GetRecordsResult recordsResult = 
reader.client.getRecords(recordsRequest);
 
-            LOGGER.info("{} records {} millis behind {}:{}:{} ", 
recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), 
streamName, shardId, shardIteratorId);
+      LOGGER.info("{} records {} millis behind {}:{}:{} ", 
recordsResult.getRecords().size(), recordsResult.getMillisBehindLatest(), 
streamName, shardId, shardIteratorId);
 
-            shardIteratorId = recordsResult.getNextShardIterator();
+      shardIteratorId = recordsResult.getNextShardIterator();
 
-            List<Record> recordList = recordsResult.getRecords();
+      List<Record> recordList = recordsResult.getRecords();
 
-            for (Record record : recordList) {
-                try {
-                    byte[] byteArray = record.getData().array();
-                    //byte[] decoded = Base64.decode(byteArray);
-                    String message = new String(byteArray, 
Charset.forName("UTF-8"));
-                    reader.persistQueue.add(
-                            new StreamsDatum(
-                                    message,
-                                    record.getPartitionKey(),
-                                    new DateTime(),
-                                    new BigInteger(record.getSequenceNumber()),
-                                    metadata));
-                } catch( Exception e ) {
-                    LOGGER.warn("Exception processing record {}: {}", record, 
e);
-                }
-            }
-            try {
-                Thread.sleep(reader.pollInterval);
-            } catch (InterruptedException e) {}
+      for (Record record : recordList) {
+        try {
+          byte[] byteArray = record.getData().array();
+          //byte[] decoded = Base64.decode(byteArray);
+          String message = new String(byteArray, Charset.forName("UTF-8"));
+          reader.persistQueue.add(
+              new StreamsDatum(
+                  message,
+                  record.getPartitionKey(),
+                  new DateTime(),
+                  new BigInteger(record.getSequenceNumber()),
+                  metadata));
+        } catch ( Exception ex ) {
+          LOGGER.warn("Exception processing record {}: {}", record, ex);
         }
-
+      }
+      try {
+        Thread.sleep(reader.pollInterval);
+      } catch (InterruptedException ex) {
+        LOGGER.trace("InterruptedException", ex);
+      }
     }
 
+  }
+
 }

Reply via email to