http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
index 86ab72f..e322990 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherQueryGraphHelper.java
@@ -18,17 +18,19 @@
 
 package org.apache.streams.graph.neo4j;
 
+import org.apache.streams.data.util.PropertyUtil;
+import org.apache.streams.graph.QueryGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+import org.apache.streams.pojo.json.Activity;
+import org.apache.streams.pojo.json.ActivityObject;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.QueryGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
+
 import org.javatuples.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,178 +44,196 @@ import java.util.Map;
  */
 public class CypherQueryGraphHelper implements QueryGraphHelper {
 
-    private final static ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
 
-    public final static String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
-    public final static String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
+  public static final String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
+  public static final String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
 
-    public final static String createVertexStatementTemplate = "MATCH (x {id: 
'<id>'}) "+
-            "CREATE UNIQUE (v:<type> { props }) "+
-            "ON CREATE SET v <labels> "+
-            "RETURN v";
+  public static final String createVertexStatementTemplate =
+      "MATCH (x {id: '<id>'}) "
+          + "CREATE UNIQUE (v:<type> { props }) "
+          + "ON CREATE SET v <labels> "
+          + "RETURN v";
 
-    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> 
{id: '<id>'}) "+
-            "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "+
-            "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "+
-            "RETURN v";
 
-    public final static String createEdgeStatementTemplate = "MATCH 
(s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
-            "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "+
-            "RETURN r";
 
-    public Pair<String, Map<String, Object>> getVertexRequest(String 
streamsId) {
+  public static final String mergeVertexStatementTemplate =
+      "MERGE (v:<type> {id: '<id>'}) "
+          + "ON CREATE SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
+          + "ON MATCH SET v <labels>, v = { props }, v.`@timestamp` = 
timestamp() "
+          + "RETURN v";
 
-        ST getVertex = new ST(getVertexStringIdStatementTemplate);
-        getVertex.add("id", streamsId);
+  public static final String createEdgeStatementTemplate =
+      "MATCH (s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "
+          + "CREATE UNIQUE (s)-[r:<r_type> <r_props>]->(d) "
+          + "RETURN r";
 
-        Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
+  /**
+   * getVertexRequest.
+   * @param streamsId streamsId
+   * @return pair (streamsId, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId) {
 
-        LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+    ST getVertex = new ST(getVertexStringIdStatementTemplate);
+    getVertex.add("id", streamsId);
 
-        return queryPlusParameters;
-    }
-
-    @Override
-    public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
 
-        ST getVertex = new ST(getVertexLongIdStatementTemplate);
-        getVertex.add("id", vertexId);
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
 
-        Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
+    return queryPlusParameters;
+  }
 
-        LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
+  /**
+   * getVertexRequest.
+   * @param vertexId numericId
+   * @return pair (streamsId, parameterMap)
+   */
+  @Override
+  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId) {
 
-        return queryPlusParameters;
+    ST getVertex = new ST(getVertexLongIdStatementTemplate);
+    getVertex.add("id", vertexId);
 
-    }
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(getVertex.render(), null);
 
-    public Pair<String, Map<String, Object>> 
createVertexRequest(ActivityObject activityObject) {
+    LOGGER.debug("getVertexRequest", queryPlusParameters.toString());
 
-        Preconditions.checkNotNull(activityObject.getObjectType());
+    return queryPlusParameters;
 
-        List<String> labels = getLabels(activityObject);
+  }
 
-        ST createVertex = new ST(createVertexStatementTemplate);
-        createVertex.add("id", activityObject.getId());
-        createVertex.add("type", activityObject.getObjectType());
-        if( labels.size() > 0)
-            createVertex.add("labels", Joiner.on(' ').join(labels));
-        String query = createVertex.render();
+  /**
+   * createVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject 
activityObject) {
 
-        ObjectNode object = MAPPER.convertValue(activityObject, 
ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+    Preconditions.checkNotNull(activityObject.getObjectType());
 
-        Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(createVertex.render(), props);
+    List<String> labels = getLabels(activityObject);
 
-        LOGGER.debug("createVertexRequest: ({},{})", query, props);
+    ST createVertex = new ST(createVertexStatementTemplate);
+    createVertex.add("id", activityObject.getId());
+    createVertex.add("type", activityObject.getObjectType());
 
-        return queryPlusParameters;
+    if ( labels.size() > 0 ) {
+      createVertex.add("labels", Joiner.on(' ').join(labels));
     }
 
-    public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject) {
+    String query = createVertex.render();
 
-        Preconditions.checkNotNull(activityObject.getObjectType());
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
 
-        Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+    Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(createVertex.render(), props);
 
-        List<String> labels = getLabels(activityObject);
+    LOGGER.debug("createVertexRequest: ({},{})", query, props);
 
-        ST mergeVertex = new ST(mergeVertexStatementTemplate);
-        mergeVertex.add("id", activityObject.getId());
-        mergeVertex.add("type", activityObject.getObjectType());
-        if( labels.size() > 0)
-            mergeVertex.add("labels", Joiner.on(' ').join(labels));
-        String query = mergeVertex.render();
+    return queryPlusParameters;
+  }
 
-        ObjectNode object = MAPPER.convertValue(activityObject, 
ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+  /**
+   * mergeVertexRequest.
+   * @param activityObject activityObject
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject) {
 
-        LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+    Preconditions.checkNotNull(activityObject.getObjectType());
 
-        queryPlusParameters = queryPlusParameters.setAt0(query);
-        queryPlusParameters = queryPlusParameters.setAt1(props);
+    Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
 
-        return queryPlusParameters;
-    }
-
-    public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity) {
-
-        Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
-
-        ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-        ST mergeEdge = new ST(createEdgeStatementTemplate);
-        mergeEdge.add("s_id", activity.getActor().getId());
-        mergeEdge.add("s_type", activity.getActor().getObjectType());
-        mergeEdge.add("d_id", activity.getObject().getId());
-        mergeEdge.add("d_type", activity.getObject().getObjectType());
-        mergeEdge.add("r_id", activity.getId());
-        mergeEdge.add("r_type", activity.getVerb());
-        mergeEdge.add("r_props", getPropertyCreater(props));
-
-        // set the activityObject's and extensions null, because their 
properties don't need to appear on the relationship
-        activity.setActor(null);
-        activity.setObject(null);
-        activity.setTarget(null);
-        activity.getAdditionalProperties().put("extensions", null);
-
-        String statement = mergeEdge.render();
-        queryPlusParameters = queryPlusParameters.setAt0(statement);
-        queryPlusParameters = queryPlusParameters.setAt1(props);
+    List<String> labels = getLabels(activityObject);
 
-        LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
-
-        return queryPlusParameters;
-    }
-
-    public static String getPropertyValueSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
+    ST mergeVertex = new ST(mergeVertexStatementTemplate);
+    mergeVertex.add("id", activityObject.getId());
+    mergeVertex.add("type", activityObject.getObjectType());
+    if ( labels.size() > 0 ) {
+      mergeVertex.add("labels", Joiner.on(' ').join(labels));
     }
-
-    public static String getPropertyParamSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
+    String query = mergeVertex.render();
+
+    ObjectNode object = MAPPER.convertValue(activityObject, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    LOGGER.debug("mergeVertexRequest: ({},{})", query, props);
+
+    queryPlusParameters = queryPlusParameters.setAt0(query);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * createEdgeRequest.
+   * @param activity activity
+   * @return pair (query, parameterMap)
+   */
+  public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity) {
+
+    Pair queryPlusParameters = new Pair(null, Maps.newHashMap());
+
+    ObjectNode object = MAPPER.convertValue(activity, ObjectNode.class);
+    Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
+
+    ST mergeEdge = new ST(createEdgeStatementTemplate);
+    mergeEdge.add("s_id", activity.getActor().getId());
+    mergeEdge.add("s_type", activity.getActor().getObjectType());
+    mergeEdge.add("d_id", activity.getObject().getId());
+    mergeEdge.add("d_type", activity.getObject().getObjectType());
+    mergeEdge.add("r_id", activity.getId());
+    mergeEdge.add("r_type", activity.getVerb());
+    mergeEdge.add("r_props", getPropertyCreater(props));
+
+    // set the activityObject's and extensions null, because their properties 
don't need to appear on the relationship
+    activity.setActor(null);
+    activity.setObject(null);
+    activity.setTarget(null);
+    activity.getAdditionalProperties().put("extensions", null);
+
+    String statement = mergeEdge.render();
+    queryPlusParameters = queryPlusParameters.setAt0(statement);
+    queryPlusParameters = queryPlusParameters.setAt1(props);
+
+    LOGGER.debug("createEdgeRequest: ({},{})", statement, props);
+
+    return queryPlusParameters;
+  }
+
+  /**
+   * getPropertyCreater.
+   * @param map paramMap
+   * @return PropertyCreater string
+   */
+  public static String getPropertyCreater(Map<String, Object> map) {
+    StringBuilder builder = new StringBuilder();
+    builder.append("{");
+    List<String> parts = Lists.newArrayList();
+    for ( Map.Entry<String, Object> entry : map.entrySet()) {
+      if ( entry.getValue() instanceof String ) {
+        String propVal = (String) (entry.getValue());
+        parts.add("`" + entry.getKey() + "`:'" + propVal + "'");
+      }
     }
-
-    public static String getPropertyCreater(Map<String, Object> map) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        List<String> parts = Lists.newArrayList();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String) (entry.getValue());
-                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
-            }
-        }
-        builder.append(Joiner.on(",").join(parts));
-        builder.append("}");
-        return builder.toString();
-    }
-
-    private List<String> getLabels(ActivityObject activityObject) {
-        List<String> labels = Lists.newArrayList(":streams");
-        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
-            List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
-            for( String extraLabel : extraLabels )
-                labels.add(":"+extraLabel);
-        }
-        return labels;
+    builder.append(Joiner.on(",").join(parts));
+    builder.append("}");
+    return builder.toString();
+  }
+
+  private List<String> getLabels(ActivityObject activityObject) {
+    List<String> labels = Lists.newArrayList(":streams");
+    if ( activityObject.getAdditionalProperties().containsKey("labels") ) {
+      List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
+      for ( String extraLabel : extraLabels ) {
+        labels.add(":" + extraLabel);
+      }
     }
+    return labels;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
index 48fe62d..72e668f 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/Neo4jHttpGraphHelper.java
@@ -18,58 +18,59 @@
 
 package org.apache.streams.graph.neo4j;
 
+import org.apache.streams.graph.HttpGraphHelper;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.HttpGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
+
 import org.javatuples.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.stringtemplate.v4.ST;
 
-import java.util.List;
 import java.util.Map;
 
 /**
- * Supporting class for interacting with neo4j via rest API
+ * Supporting class for interacting with neo4j via rest API.
  */
 public class Neo4jHttpGraphHelper implements HttpGraphHelper {
 
-    private final static ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
+  private static final ObjectMapper MAPPER = 
StreamsJacksonMapper.getInstance();
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(Neo4jHttpGraphHelper.class);
 
-    public final static String statementKey = "statement";
-    public final static String paramsKey = "parameters";
-    public final static String propsKey = "props";
+  public static final String statementKey = "statement";
+  public static final String paramsKey = "parameters";
+  public static final String propsKey = "props";
 
-    public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters) {
+  /**
+   * createHttpRequest neo4j rest json payload.
+   *
+   * @param queryPlusParameters (query, parameter map)
+   * @return ObjectNode
+   */
+  public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters) {
 
-        LOGGER.debug("createHttpRequest: ", queryPlusParameters);
+    LOGGER.debug("createHttpRequest: ", queryPlusParameters);
 
-        Preconditions.checkNotNull(queryPlusParameters);
-        Preconditions.checkNotNull(queryPlusParameters.getValue0());
-        Preconditions.checkNotNull(queryPlusParameters.getValue1());
+    Preconditions.checkNotNull(queryPlusParameters);
+    Preconditions.checkNotNull(queryPlusParameters.getValue0());
+    Preconditions.checkNotNull(queryPlusParameters.getValue1());
 
-        ObjectNode request = MAPPER.createObjectNode();
+    ObjectNode request = MAPPER.createObjectNode();
 
-        request.put(statementKey, queryPlusParameters.getValue0());
+    request.put(statementKey, queryPlusParameters.getValue0());
 
-        ObjectNode params = MAPPER.createObjectNode();
-        ObjectNode props = 
MAPPER.convertValue(queryPlusParameters.getValue1(), ObjectNode.class);
+    ObjectNode params = MAPPER.createObjectNode();
+    ObjectNode props = MAPPER.convertValue(queryPlusParameters.getValue1(), 
ObjectNode.class);
 
-        params.put(propsKey, props);
-        request.put(paramsKey, params);
+    params.put(propsKey, props);
+    request.put(paramsKey, params);
 
-        LOGGER.debug("createHttpRequest: ", request);
+    LOGGER.debug("createHttpRequest: ", request);
 
-        return request;
-    }
+    return request;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
index 3f889aa..c29c8b7 100644
--- 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestCypherQueryGraphHelper.java
@@ -16,98 +16,101 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.graph.test;
 
 import org.apache.streams.graph.neo4j.CypherQueryGraphHelper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
 import org.javatuples.Pair;
 import org.junit.Test;
 
 import java.util.Map;
 
 /**
- * Created by sblackmon on 6/24/15.
+ * TestCypherQueryGraphHelper tests
+ * @see org.apache.streams.graph.neo4j.CypherQueryGraphHelper
  */
 public class TestCypherQueryGraphHelper {
 
-    CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
+  CypherQueryGraphHelper helper = new CypherQueryGraphHelper();
 
-    @Test
-    public void getVertexRequestIdTest() throws Exception {
+  @Test
+  public void getVertexRequestIdTest() throws Exception {
 
-        Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest("id");
-        assert(queryAndParams != null);
-        assert(queryAndParams.getValue0() != null);
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest("id");
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
 
-    }
+  }
 
-    @Test
-    public void getVertexRequestLongTest() throws Exception {
+  @Test
+  public void getVertexRequestLongTest() throws Exception {
 
-        Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest(new Long(1));
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.getVertexRequest(new Long(1));
 
-        assert(queryAndParams != null);
-        assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
 
-    }
+  }
 
-    @Test
-    public void createVertexRequestTest() throws Exception {
+  @Test
+  public void createVertexRequestTest() throws Exception {
 
-        ActivityObject activityObject = new ActivityObject();
-        activityObject.setId("id");
-        activityObject.setObjectType("type");
-        activityObject.setContent("content");
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
 
-        Pair<String, Map<String, Object>> queryAndParams = 
helper.createVertexRequest(activityObject);
-        assert(queryAndParams != null);
-        assert(queryAndParams.getValue0() != null);
-        assert(queryAndParams.getValue1() != null);
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.createVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
 
-    }
+  }
 
-    @Test
-    public void mergeVertexRequestTest() throws Exception {
+  @Test
+  public void mergeVertexRequestTest() throws Exception {
 
-        ActivityObject activityObject = new ActivityObject();
-        activityObject.setId("id");
-        activityObject.setObjectType("type");
-        activityObject.setContent("content");
+    ActivityObject activityObject = new ActivityObject();
+    activityObject.setId("id");
+    activityObject.setObjectType("type");
+    activityObject.setContent("content");
 
-        Pair<String, Map<String, Object>> queryAndParams = 
helper.mergeVertexRequest(activityObject);
-        assert(queryAndParams != null);
-        assert(queryAndParams.getValue0() != null);
-        assert(queryAndParams.getValue1() != null);
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.mergeVertexRequest(activityObject);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
 
-    }
+  }
 
-    @Test
-    public void createEdgeRequestTest() throws Exception {
+  @Test
+  public void createEdgeRequestTest() throws Exception {
 
-        ActivityObject actor = new ActivityObject();
-        actor.setId("actor");
-        actor.setObjectType("type");
-        actor.setContent("content");
+    ActivityObject actor = new ActivityObject();
+    actor.setId("actor");
+    actor.setObjectType("type");
+    actor.setContent("content");
 
-        ActivityObject object = new ActivityObject();
-        object.setId("object");
-        object.setObjectType("type");
-        object.setContent("content");
+    ActivityObject object = new ActivityObject();
+    object.setId("object");
+    object.setObjectType("type");
+    object.setContent("content");
 
-        Activity activity = new Activity();
-        activity.setId("activity");
-        activity.setVerb("verb");
-        activity.setContent("content");
+    Activity activity = new Activity();
+    activity.setId("activity");
+    activity.setVerb("verb");
+    activity.setContent("content");
 
-        activity.setActor(actor);
-        activity.setObject(object);
+    activity.setActor(actor);
+    activity.setObject(object);
 
-        Pair<String, Map<String, Object>> queryAndParams = 
helper.createEdgeRequest(activity);
+    Pair<String, Map<String, Object>> queryAndParams = 
helper.createEdgeRequest(activity);
 
-        assert(queryAndParams != null);
-        assert(queryAndParams.getValue0() != null);
-        assert(queryAndParams.getValue1() != null);
+    assert(queryAndParams != null);
+    assert(queryAndParams.getValue0() != null);
+    assert(queryAndParams.getValue1() != null);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
index eb7ce96..673b402 100644
--- 
a/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
+++ 
b/streams-contrib/streams-persist-graph/src/test/java/org/apache/streams/graph/test/TestNeo4jHttpVertexReader.java
@@ -18,14 +18,16 @@
 
 package org.apache.streams.graph.test;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.graph.GraphHttpConfiguration;
 import org.apache.streams.graph.GraphReaderConfiguration;
 import org.apache.streams.graph.GraphVertexReader;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+import org.apache.commons.io.IOUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -39,43 +41,43 @@ import java.util.List;
  * Unit test for
  * @see {@link org.apache.streams.graph.GraphVertexReader}
  *
- * Test that graph db responses can be converted to streams data
+ * Test that graph db responses can be converted to streams data.
  */
 public class TestNeo4jHttpVertexReader {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestNeo4jHttpVertexReader.class);
 
-    private final static ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
+  private static final ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
 
-    private JsonNode sampleReaderResult;
+  private JsonNode sampleReaderResult;
 
-    private GraphReaderConfiguration testConfiguration;
+  private GraphReaderConfiguration testConfiguration;
 
-    private GraphVertexReader graphPersistReader;
+  private GraphVertexReader graphPersistReader;
 
-    @Before
-    public void prepareTest() throws IOException {
+  @Before
+  public void prepareTest() throws IOException {
 
-        testConfiguration = new GraphReaderConfiguration();
-        testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
+    testConfiguration = new GraphReaderConfiguration();
+    testConfiguration.setType(GraphHttpConfiguration.Type.NEO_4_J);
 
-        graphPersistReader = new GraphVertexReader(testConfiguration);
-        InputStream testActivityFileStream = 
TestNeo4jHttpVertexReader.class.getClassLoader()
-                .getResourceAsStream("sampleReaderResult.json");
-        String sampleText = IOUtils.toString(testActivityFileStream, "utf-8");
-        sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
+    graphPersistReader = new GraphVertexReader(testConfiguration);
+    InputStream testActivityFileStream = 
TestNeo4jHttpVertexReader.class.getClassLoader()
+        .getResourceAsStream("sampleReaderResult.json");
+    String sampleText = IOUtils.toString(testActivityFileStream, "utf-8");
+    sampleReaderResult = mapper.readValue(sampleText, JsonNode.class);
 
-    }
+  }
 
-    @Test
-    public void testParseNeoResult() throws IOException {
+  @Test
+  public void testParseNeoResult() throws IOException {
 
-        List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
+    List<ObjectNode> result = graphPersistReader.parse(sampleReaderResult);
 
-        assert( result.size() == 10);
+    assert( result.size() == 10);
 
-        for( int i = 0 ; i < 10; i++ )
-            assert( result.get(i).get("extensions").size() == 5);
+    for( int i = 0 ; i < 10; i++ )
+      assert( result.get(i).get("extensions").size() == 5);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
index 1e066fb..c5a06fc 100644
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
+++ 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriter.java
@@ -18,13 +18,15 @@
 
 package org.apache.streams.hbase;
 
+import org.apache.streams.config.ComponentConfigurator;
+import org.apache.streams.config.StreamsConfigurator;
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsPersistWriter;
+import org.apache.streams.util.GuidUtils;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.io.Closeable;
-import java.io.Flushable;
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -33,187 +35,195 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.HTablePool;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfigurator;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsPersistWriter;
-import org.apache.streams.util.GuidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class HbasePersistWriter implements StreamsPersistWriter, Flushable, 
Closeable
-{
-    public final static String STREAMS_ID = "HbasePersistWriter";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(HbasePersistWriter.class);
-
-    protected HConnection connection;
-    protected HTablePool pool;
-    protected HTableInterface table;
-    protected HTableDescriptor descriptor;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    private ObjectMapper mapper = new ObjectMapper();
-
-    private HbaseConfiguration config;
+import java.io.Closeable;
+import java.io.Flushable;
+import java.io.IOException;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
-    public HbasePersistWriter() {
-        this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
-        this.persistQueue  = new ConcurrentLinkedQueue<>();
+/**
+ * HbasePersistWriter writes to hbase.
+ */
+public class HbasePersistWriter implements StreamsPersistWriter, Flushable, 
Closeable {
+
+  public static final String STREAMS_ID = "HbasePersistWriter";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HbasePersistWriter.class);
+
+  protected HConnection connection;
+  protected HTablePool pool;
+  protected HTableInterface table;
+  protected HTableDescriptor descriptor;
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  private ObjectMapper mapper = new ObjectMapper();
+
+  private HbaseConfiguration config;
+
+  /**
+   * HbasePersistWriter constructor - resolve HbaseConfiguration from JVM 
'hbase'.
+   */
+  public HbasePersistWriter() {
+    this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
+  }
+
+  /**
+   * HbasePersistWriter constructor - use supplied persistQueue.
+   * @param persistQueue persistQueue
+   */
+  // TODO: refactor this to use HbaseConfiguration
+  public HbasePersistWriter(Queue<StreamsDatum> persistQueue) {
+    this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
+    this.persistQueue = persistQueue;
+  }
+
+  private synchronized void connectToHbase() {
+
+    // TODO: refactor this to resolve this stuff from typesafe
+    Configuration configuration = new Configuration();
+    configuration.set("hbase.rootdir", config.getRootdir());
+    configuration.set("zookeeper.znode.parent", config.getParent());
+    configuration.set("zookeeper.znode.rootserver", config.getRootserver());
+    //configuration.set("hbase.master", config.getRootserver());
+    //configuration.set("hbase.cluster.distributed", "false");
+    configuration.set("hbase.zookeeper.quorum", config.getQuorum());
+    configuration.set("hbase.zookeeper.property.clientPort", 
Long.toString(config.getClientPort()));
+    configuration.setInt("zookeeper.session.timeout", 1000);
+
+    configuration.setInt("timeout", 1000);
+
+    //pool = new HTablePool(configuration, 10);
+    try {
+      connection = HConnectionManager.createConnection(configuration);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      return;
     }
 
-    public HbasePersistWriter(Queue<StreamsDatum> persistQueue) {
-        this.config = new ComponentConfigurator<>(HbaseConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("hbase"));
-        this.persistQueue = persistQueue;
+    try {
+      //    table = new HTable(configuration, config.getTable());
+      //    table = (HTable) pool.getTable(config.getTable());
+      table = new HTable(configuration, config.getTable().getBytes());
+      table.setAutoFlush(true);
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      return;
     }
+    //
 
-    private synchronized void connectToHbase()
-    {
-        Configuration configuration = new Configuration();
-        configuration.set("hbase.rootdir", config.getRootdir());
-        configuration.set("zookeeper.znode.parent", config.getParent());
-        configuration.set("zookeeper.znode.rootserver", 
config.getRootserver());
-        //configuration.set("hbase.master", config.getRootserver());
-        //configuration.set("hbase.cluster.distributed", "false");
-        configuration.set("hbase.zookeeper.quorum", config.getQuorum());
-        configuration.set("hbase.zookeeper.property.clientPort", 
Long.toString(config.getClientPort()));
-        configuration.setInt("zookeeper.session.timeout", 1000);
-
-        configuration.setInt("timeout", 1000);
-
-        //pool = new HTablePool(configuration, 10);
-        try {
-            connection = HConnectionManager.createConnection(configuration);
-        } catch (Exception e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try {
-        //    table = new HTable(configuration, config.getTable());
-        //    table = (HTable) pool.getTable(config.getTable());
-            table = new HTable(configuration, config.getTable().getBytes());
-            table.setAutoFlush(true);
-        } catch (Exception e) {
-            e.printStackTrace();
-            return;
-        }
-        //
-
-        try {
-            descriptor = table.getTableDescriptor();
-        } catch (Exception e) {
-            e.printStackTrace();
-            return;
-        }
-
-        try
-        {
-            LOGGER.info("Table : {}", descriptor);
-        }
-        catch (Exception e)
-        {
-            LOGGER.error("There was an error connecting to HBase, please check 
your settings and try again");
-            e.printStackTrace();
-        }
+    try {
+      descriptor = table.getTableDescriptor();
+    } catch (Exception ex) {
+      ex.printStackTrace();
+      return;
     }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+    try {
+      LOGGER.info("Table : {}", descriptor);
+    } catch (Exception ex) {
+      LOGGER.error("There was an error connecting to HBase, please check your 
settings and try again");
+      ex.printStackTrace();
     }
-
-    @Override
-    public void write(StreamsDatum streamsDatum) {
-
-        ObjectNode node;
-        Put put = new Put();
-        if( streamsDatum.getDocument() instanceof String ) {
-            try {
-                node = mapper.readValue((String)streamsDatum.getDocument(), 
ObjectNode.class);
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Invalid json: {}", 
streamsDatum.getDocument().toString());
-                return;
-            }
-            put.setId(GuidUtils.generateGuid(node.toString()));
-            try {
-                byte[] value = node.binaryValue();
-                put.add(config.getFamily().getBytes(), 
config.getQualifier().getBytes(), value);
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Failure adding object: {}", 
streamsDatum.getDocument().toString());
-                return;
-            }
-        } else {
-            try {
-                node = mapper.valueToTree(streamsDatum.getDocument());
-            } catch (Exception e) {
-                e.printStackTrace();
-                LOGGER.warn("Invalid json: {}", 
streamsDatum.getDocument().toString());
-                return;
-            }
-            put.setId(GuidUtils.generateGuid(node.toString()));
-            try {
-                byte[] value = node.binaryValue();
-                put.add(config.getFamily().getBytes(), 
config.getQualifier().getBytes(), value);
-            } catch (IOException e) {
-                e.printStackTrace();
-                LOGGER.warn("Failure preparing put: {}", 
streamsDatum.getDocument().toString());
-                return;
-            }
-        }
-        try {
-            table.put(put);
-        } catch (IOException e) {
-            e.printStackTrace();
-            LOGGER.warn("Failure executin put: {}", 
streamsDatum.getDocument().toString());
-        }
-
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void write(StreamsDatum streamsDatum) {
+
+    ObjectNode node;
+    Put put = new Put();
+    if ( streamsDatum.getDocument() instanceof String ) {
+      try {
+        node = mapper.readValue((String)streamsDatum.getDocument(), 
ObjectNode.class);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+        LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+        return;
+      }
+      put.setId(GuidUtils.generateGuid(node.toString()));
+      try {
+        byte[] value = node.binaryValue();
+        put.add(config.getFamily().getBytes(), 
config.getQualifier().getBytes(), value);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+        LOGGER.warn("Failure adding object: {}", 
streamsDatum.getDocument().toString());
+        return;
+      }
+    } else {
+      try {
+        node = mapper.valueToTree(streamsDatum.getDocument());
+      } catch (Exception ex) {
+        ex.printStackTrace();
+        LOGGER.warn("Invalid json: {}", streamsDatum.getDocument().toString());
+        return;
+      }
+      put.setId(GuidUtils.generateGuid(node.toString()));
+      try {
+        byte[] value = node.binaryValue();
+        put.add(config.getFamily().getBytes(), 
config.getQualifier().getBytes(), value);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+        LOGGER.warn("Failure preparing put: {}", 
streamsDatum.getDocument().toString());
+        return;
+      }
     }
-
-    public void flush() throws IOException
-    {
-        table.flushCommits();
+    try {
+      table.put(put);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+      LOGGER.warn("Failure executin put: {}", 
streamsDatum.getDocument().toString());
     }
 
-    public synchronized void close() throws IOException
-    {
-        table.close();
-    }
+  }
+
+  public void flush() throws IOException {
+    table.flushCommits();
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  public synchronized void close() throws IOException {
+    table.close();
+  }
 
-        connectToHbase();
+  @Override
+  public void prepare(Object configurationObject) {
 
-        Thread task = new Thread(new HbasePersistWriterTask(this));
-        task.start();
+    connectToHbase();
 
-        try {
-            task.join();
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
+    Thread task = new Thread(new HbasePersistWriterTask(this));
+    task.start();
 
+    try {
+      task.join();
+    } catch (InterruptedException ex) {
+      ex.printStackTrace();
     }
 
-    @Override
-    public void cleanUp() {
+  }
 
-        try {
-            flush();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
-        try {
-            close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+  @Override
+  public void cleanUp() {
 
+    try {
+      flush();
+    } catch (IOException ex) {
+      ex.printStackTrace();
     }
+    try {
+      close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
index 19a398d..eef7004 100644
--- 
a/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
+++ 
b/streams-contrib/streams-persist-hbase/src/main/java/org/apache/streams/hbase/HbasePersistWriterTask.java
@@ -19,38 +19,45 @@
 package org.apache.streams.hbase;
 
 import org.apache.streams.core.StreamsDatum;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Random;
 
+/**
+ * HbasePersistWriterTask writes to hbase on behalf of
+ * @see org.apache.streams.hbase.HbasePersistWriter
+ */
 public class HbasePersistWriterTask implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(HbasePersistWriterTask.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(HbasePersistWriterTask.class);
 
-    private HbasePersistWriter writer;
+  private HbasePersistWriter writer;
 
-    public HbasePersistWriterTask(HbasePersistWriter writer) {
-        this.writer = writer;
-    }
+  public HbasePersistWriterTask(HbasePersistWriter writer) {
+    this.writer = writer;
+  }
 
-    @Override
-    public void run() {
-
-        while(true) {
-            if( writer.persistQueue.peek() != null ) {
-                try {
-                    StreamsDatum entry = writer.persistQueue.remove();
-                    writer.write(entry);
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-            try {
-                Thread.sleep(new Random().nextInt(1));
-            } catch (InterruptedException ignored) {}
-        }
+  @Override
+  public void run() {
 
+    while (true) {
+      if ( writer.persistQueue.peek() != null ) {
+        try {
+          StreamsDatum entry = writer.persistQueue.remove();
+          writer.write(entry);
+        } catch (Exception ex) {
+          ex.printStackTrace();
+        }
+      }
+      try {
+        Thread.sleep(new Random().nextInt(1));
+      } catch (InterruptedException ignored) {
+        LOGGER.trace("ignored InterruptedException", ignored);
+      }
     }
 
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
index e96cc8d..b6660ab 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/HdfsConstants.java
@@ -19,13 +19,13 @@
 package org.apache.streams.hdfs;
 
 /**
- * Predefined field symbols for streams-persist-hdfs
+ * Predefined field symbols for streams-persist-hdfs.
  */
 public class HdfsConstants {
 
-    protected static final String ID = "ID";
-    protected static final String TS = "TS";
-    protected static final String META = "META";
-    protected static final String DOC = "DOC";
+  protected static final String ID = "ID";
+  protected static final String TS = "TS";
+  protected static final String META = "META";
+  protected static final String DOC = "DOC";
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
index 24c9737..d254abb 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReader.java
@@ -18,16 +18,6 @@
 
 package org.apache.streams.hdfs;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Queues;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfiguration;
 import org.apache.streams.config.StreamsConfigurator;
@@ -38,6 +28,18 @@ import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Queues;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,220 +58,244 @@ import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
 /**
- * Created by sblackmon on 2/28/14.
+ * WebHdfsPersistReader reads from hdfs.
  */
 public class WebHdfsPersistReader implements StreamsPersistReader, 
DatumStatusCountable {
 
-    public final static String STREAMS_ID = "WebHdfsPersistReader";
-
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(WebHdfsPersistReader.class);
-
-    protected final static char DELIMITER = '\t';
-
-    protected FileSystem client;
-    protected Path path;
-    protected FileStatus[] status;
-
-    protected volatile Queue<StreamsDatum> persistQueue;
-
-    protected ObjectMapper mapper;
-    protected LineReadWriteUtil lineReaderUtil;
-
-    protected HdfsReaderConfiguration hdfsConfiguration;
-    protected StreamsConfiguration streamsConfiguration;
-
-    private ExecutorService executor;
-
-    protected DatumStatusCounter countersTotal = new DatumStatusCounter();
-    protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
-    private Future<?> task;
-
-    public WebHdfsPersistReader() {
-        this(new 
ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+  public static final String STREAMS_ID = "WebHdfsPersistReader";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WebHdfsPersistReader.class);
+
+  protected static final char DELIMITER = '\t';
+
+  protected FileSystem client;
+  protected Path path;
+  protected FileStatus[] status;
+
+  protected volatile Queue<StreamsDatum> persistQueue;
+
+  protected ObjectMapper mapper;
+  protected LineReadWriteUtil lineReaderUtil;
+
+  protected HdfsReaderConfiguration hdfsConfiguration;
+  protected StreamsConfiguration streamsConfiguration;
+
+  private ExecutorService executor;
+
+  protected DatumStatusCounter countersTotal = new DatumStatusCounter();
+  protected DatumStatusCounter countersCurrent = new DatumStatusCounter();
+  private Future<?> task;
+
+  /**
+   * WebHdfsPersistReader constructor - resolves HdfsReaderConfiguration from 
JVM 'hdfs'.
+   */
+  public WebHdfsPersistReader() {
+    this(new 
ComponentConfigurator<>(HdfsReaderConfiguration.class).detectConfiguration(StreamsConfigurator.getConfig().getConfig("hdfs")));
+  }
+
+  /**
+   * WebHdfsPersistReader constructor - uses supplied HdfsReaderConfiguration.
+   * @param hdfsConfiguration hdfsConfiguration
+   */
+  public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
+    this.hdfsConfiguration = hdfsConfiguration;
+  }
+
+  /**
+   * getURI from hdfsConfiguration.
+   * @return URI
+   * @throws URISyntaxException URISyntaxException
+   */
+  public URI getURI() throws URISyntaxException {
+    StringBuilder uriBuilder = new StringBuilder();
+    uriBuilder.append(hdfsConfiguration.getScheme());
+    uriBuilder.append("://");
+    if ( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
+      uriBuilder.append(hdfsConfiguration.getHost());
+      if (hdfsConfiguration.getPort() != null) {
+        uriBuilder.append(":" + hdfsConfiguration.getPort());
+      }
+    } else {
+      uriBuilder.append("/");
     }
-
-    public WebHdfsPersistReader(HdfsReaderConfiguration hdfsConfiguration) {
-        this.hdfsConfiguration = hdfsConfiguration;
-    }
-
-    public URI getURI() throws URISyntaxException {
-        StringBuilder uriBuilder = new StringBuilder();
-        uriBuilder.append(hdfsConfiguration.getScheme());
-        uriBuilder.append("://");
-        if( !Strings.isNullOrEmpty(hdfsConfiguration.getHost())) {
-            uriBuilder.append(hdfsConfiguration.getHost());
-            if (hdfsConfiguration.getPort() != null)
-                uriBuilder.append(":" + hdfsConfiguration.getPort());
-        } else {
-            uriBuilder.append("/");
-        }
-        return new URI(uriBuilder.toString());
+    return new URI(uriBuilder.toString());
+  }
+
+  /**
+   * isConnected.
+   * @return true if connected, false otherwise
+   */
+  public boolean isConnected() {
+    return (client != null);
+  }
+
+  /**
+   * getFileSystem.
+   * @return FileSystem
+   */
+  public final synchronized FileSystem getFileSystem() {
+    // Check to see if we are connected.
+    if (!isConnected()) {
+      connectToWebHDFS();
     }
-
-    public boolean isConnected()                               { return 
(client != null); }
-
-    public final synchronized FileSystem getFileSystem()
-    {
-        // Check to see if we are connected.
-        if(!isConnected())
-            connectToWebHDFS();
-        return this.client;
-    }
-
-    private synchronized void connectToWebHDFS()
-    {
-        try
-        {
-            LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
-            UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
-            
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
-
-            ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                public Void run() throws Exception {
-                    Configuration conf = new Configuration();
-                    
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
-                    conf.set("fs.hdfs.impl", 
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
-                    conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
-                    LOGGER.info("WebURI : {}", getURI().toString());
-                    client = FileSystem.get(getURI(), conf);
-                    LOGGER.info("Connected to WebHDFS");
-
-                    /*
-                    * 
************************************************************************************************
-                    * This code is an example of how you would work with HDFS 
and you weren't going over
-                    * the webHDFS protocol.
-                    *
-                    * Smashew: 2013-10-01
-                    * 
************************************************************************************************
-                    conf.set("fs.defaultFS", 
"hdfs://hadoop.mdigitallife.com:8020/user/" + userName);
-                    conf.set("namenode.host","0.0.0.0");
-                    conf.set("hadoop.job.ugi", userName);
-                    conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, 
"runner");
-                    fileSystem.createNewFile(new Path("/user/"+ userName + 
"/test"));
-                    FileStatus[] status = fs.listStatus(new Path("/user/" + 
userName));
-                    for(int i=0;i<status.length;i++)
-                    {
-                        LOGGER.info("Directory: {}", status[i].getPath());
-                    }
-                    */
-                    return null;
-                }
-            });
+    return this.client;
+  }
+
+  // TODO: combine with WebHdfsPersistReader.connectToWebHDFS
+  private synchronized void connectToWebHDFS() {
+    try {
+      LOGGER.info("User : {}", this.hdfsConfiguration.getUser());
+      UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(this.hdfsConfiguration.getUser());
+      
ugi.setAuthenticationMethod(UserGroupInformation.AuthenticationMethod.SIMPLE);
+
+      ugi.doAs(new PrivilegedExceptionAction<Void>() {
+        public Void run() throws Exception {
+          Configuration conf = new Configuration();
+          
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, 
"kerberos");
+          conf.set("fs.hdfs.impl", 
org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
+          conf.set("fs.file.impl", 
org.apache.hadoop.fs.LocalFileSystem.class.getName());
+          LOGGER.info("WebURI : {}", getURI().toString());
+          client = FileSystem.get(getURI(), conf);
+          LOGGER.info("Connected to WebHDFS");
+
+          /*
+          * 
************************************************************************************************
+          * This code is an example of how you would work with HDFS and you 
weren't going over
+          * the webHDFS protocol.
+          *
+          * Smashew: 2013-10-01
+          * 
************************************************************************************************
+          conf.set("fs.defaultFS", "hdfs://hadoop.mdigitallife.com:8020/user/" 
+ userName);
+          conf.set("namenode.host","0.0.0.0");
+          conf.set("hadoop.job.ugi", userName);
+          conf.set(DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "runner");
+          fileSystem.createNewFile(new Path("/user/"+ userName + "/test"));
+          FileStatus[] status = fs.listStatus(new Path("/user/" + userName));
+          for(int i=0;i<status.length;i++)
+          {
+              LOGGER.info("Directory: {}", status[i].getPath());
+          }
+          */
+          return null;
         }
-        catch (Exception e)
-        {
-            LOGGER.error("There was an error connecting to WebHDFS, please 
check your settings and try again");
-            e.printStackTrace();
-        }
-    }
-
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
-
-    @Override
-    public void prepare(Object configurationObject) {
-        LOGGER.debug("Prepare");
-        lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
-        connectToWebHDFS();
-        String pathString = hdfsConfiguration.getPath() + "/" + 
hdfsConfiguration.getReaderPath();
-        LOGGER.info("Path : {}", pathString);
-        path = new Path(pathString);
-        try {
-            if( client.isFile(path)) {
-                LOGGER.info("Found File");
-                FileStatus fileStatus = client.getFileStatus(path);
-                status = new FileStatus[1];
-                status[0] = fileStatus;
-            } else if( client.isDirectory(path)){
-                status = client.listStatus(path);
-                List<FileStatus> statusList = Lists.newArrayList(status);
-                Collections.sort(statusList);
-                status = statusList.toArray(new FileStatus[0]);
-                LOGGER.info("Found Directory : {} files", status.length);
-            } else {
-                LOGGER.error("Neither file nor directory, wtf");
-            }
-        } catch (IOException e) {
-            LOGGER.error("IOException", e);
-        }
-        streamsConfiguration = StreamsConfigurator.detectConfiguration();
-        persistQueue = Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue()));
-        //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
-        executor = Executors.newSingleThreadExecutor();
-        mapper = StreamsJacksonMapper.getInstance();
-    }
-
-    @Override
-    public void cleanUp() {
-
+      });
+    } catch (Exception ex) {
+      LOGGER.error("There was an error connecting to WebHDFS, please check 
your settings and try again");
+      ex.printStackTrace();
     }
-
-    @Override
-    public StreamsResultSet readAll() {
-        WebHdfsPersistReaderTask readerTask = new 
WebHdfsPersistReaderTask(this);
-        Thread readerThread = new Thread(readerTask);
-        readerThread.start();
-        try {
-            readerThread.join();
-        } catch (InterruptedException e) {}
-        return new StreamsResultSet(persistQueue);
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    LOGGER.debug("Prepare");
+    lineReaderUtil = LineReadWriteUtil.getInstance(hdfsConfiguration);
+    connectToWebHDFS();
+    String pathString = hdfsConfiguration.getPath() + "/" + 
hdfsConfiguration.getReaderPath();
+    LOGGER.info("Path : {}", pathString);
+    path = new Path(pathString);
+    try {
+      if ( client.isFile(path)) {
+        LOGGER.info("Found File");
+        FileStatus fileStatus = client.getFileStatus(path);
+        status = new FileStatus[1];
+        status[0] = fileStatus;
+      } else if ( client.isDirectory(path)) {
+        status = client.listStatus(path);
+        List<FileStatus> statusList = Lists.newArrayList(status);
+        Collections.sort(statusList);
+        status = statusList.toArray(new FileStatus[0]);
+        LOGGER.info("Found Directory : {} files", status.length);
+      } else {
+        LOGGER.error("Neither file nor directory, wtf");
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IOException", ex);
     }
-
-    @Override
-    public void startStream() {
-        LOGGER.debug("startStream");
-        task = executor.submit(new WebHdfsPersistReaderTask(this));
+    streamsConfiguration = StreamsConfigurator.detectConfiguration();
+    persistQueue = Queues.synchronizedQueue(new 
LinkedBlockingQueue<StreamsDatum>(streamsConfiguration.getBatchSize().intValue()));
+    //persistQueue = Queues.synchronizedQueue(new ConcurrentLinkedQueue());
+    executor = Executors.newSingleThreadExecutor();
+    mapper = StreamsJacksonMapper.getInstance();
+  }
+
+  @Override
+  public void cleanUp() {
+
+  }
+
+  @Override
+  public StreamsResultSet readAll() {
+    WebHdfsPersistReaderTask readerTask = new WebHdfsPersistReaderTask(this);
+    Thread readerThread = new Thread(readerTask);
+    readerThread.start();
+    try {
+      readerThread.join();
+    } catch (InterruptedException ignored) {
+      LOGGER.trace("ignored InterruptedException", ignored);
     }
-
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        StreamsResultSet current;
-
-        synchronized( WebHdfsPersistReader.class ) {
-            current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
-            current.setCounter(new DatumStatusCounter());
-            current.getCounter().add(countersCurrent);
-            countersTotal.add(countersCurrent);
-            countersCurrent = new DatumStatusCounter();
-            persistQueue.clear();
-        }
-
-        return current;
+    return new StreamsResultSet(persistQueue);
+  }
+
+  @Override
+  public void startStream() {
+    LOGGER.debug("startStream");
+    task = executor.submit(new WebHdfsPersistReaderTask(this));
+  }
+
+  @Override
+  public StreamsResultSet readCurrent() {
+
+    StreamsResultSet current;
+
+    synchronized ( WebHdfsPersistReader.class ) {
+      current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+      current.setCounter(new DatumStatusCounter());
+      current.getCounter().add(countersCurrent);
+      countersTotal.add(countersCurrent);
+      countersCurrent = new DatumStatusCounter();
+      persistQueue.clear();
     }
 
-    protected void write( StreamsDatum entry ) {
-        boolean success;
-        do {
-            synchronized( WebHdfsPersistReader.class ) {
-                success = persistQueue.offer(entry);
-            }
-            Thread.yield();
-        }
-        while( !success );
-    }
+    return current;
+  }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger sequence) {
-        return null;
+  protected void write( StreamsDatum entry ) {
+    boolean success;
+    do {
+      synchronized ( WebHdfsPersistReader.class ) {
+        success = persistQueue.offer(entry);
+      }
+      Thread.yield();
     }
-
-    @Override
-    public StreamsResultSet readRange(DateTime start, DateTime end) {
-        return null;
+    while ( !success );
+  }
+
+  @Override
+  public StreamsResultSet readNew(BigInteger sequence) {
+    return null;
+  }
+
+  @Override
+  public StreamsResultSet readRange(DateTime start, DateTime end) {
+    return null;
+  }
+
+  @Override
+  public boolean isRunning() {
+    if ( task != null) {
+      return !task.isDone() && !task.isCancelled();
+    } else {
+      return true;
     }
+  }
 
-    @Override
-    public boolean isRunning() {
-        if( task != null)
-            return !task.isDone() && !task.isCancelled();
-        else return true;
-    }
-
-    @Override
-    public DatumStatusCounter getDatumStatusCounter() {
-        return countersTotal;
-    }
+  @Override
+  public DatumStatusCounter getDatumStatusCounter() {
+    return countersTotal;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
index c5c1ffe..5bff080 100644
--- 
a/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
+++ 
b/streams-contrib/streams-persist-hdfs/src/main/java/org/apache/streams/hdfs/WebHdfsPersistReaderTask.java
@@ -18,98 +18,100 @@
 
 package org.apache.streams.hdfs;
 
-import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.streams.core.DatumStatus;
+import org.apache.streams.core.StreamsDatum;
+
 import com.google.common.base.Strings;
-import com.google.common.collect.Maps;
 import com.google.common.util.concurrent.Uninterruptibles;
+
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.streams.converter.LineReadWriteUtil;
-import org.apache.streams.core.DatumStatus;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.BufferedReader;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.zip.GZIPInputStream;
 
+/**
+ * WebHdfsPersistReaderTask reads from hdfs on behalf of
+ * @see org.apache.streams.hdfs.WebHdfsPersistReader
+ */
 public class WebHdfsPersistReaderTask implements Runnable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WebHdfsPersistReaderTask.class);
 
-    private WebHdfsPersistReader reader;
+  private WebHdfsPersistReader reader;
 
-    public WebHdfsPersistReaderTask(WebHdfsPersistReader reader) {
-        this.reader = reader;
-    }
+  public WebHdfsPersistReaderTask(WebHdfsPersistReader reader) {
+    this.reader = reader;
+  }
+
+  @Override
+  public void run() {
 
-    @Override
-    public void run() {
+    LOGGER.info("WebHdfsPersistReaderTask: files to process");
 
-        LOGGER.info("WebHdfsPersistReaderTask: files to process");
+    for ( FileStatus fileStatus : reader.status ) {
+      LOGGER.info("    " + fileStatus.getPath().getName());
+    }
 
-        for( FileStatus fileStatus : reader.status ) {
-            LOGGER.info("    " + fileStatus.getPath().getName());
+    for ( FileStatus fileStatus : reader.status ) {
+      InputStream inputStream;
+      InputStreamReader inputStreamReader;
+      BufferedReader bufferedReader;
+      if ( fileStatus.isFile() && 
!fileStatus.getPath().getName().startsWith("_")) {
+        HdfsWriterConfiguration.Compression compression = 
HdfsWriterConfiguration.Compression.NONE;
+        if ( fileStatus.getPath().getName().endsWith(".gz")) {
+          compression = HdfsWriterConfiguration.Compression.GZIP;
+        }
+        LOGGER.info("Started Processing: {} Encoding: {} Compression: {}", 
fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), 
compression.toString());
+        try {
+          inputStream = reader.client.open(fileStatus.getPath());
+          if ( compression.equals(HdfsWriterConfiguration.Compression.GZIP)) {
+            inputStream = new GZIPInputStream(inputStream);
+          }
+          inputStreamReader = new InputStreamReader(inputStream, 
reader.hdfsConfiguration.getEncoding());
+          bufferedReader = new BufferedReader(inputStreamReader);
+        } catch (Exception ex) {
+          LOGGER.error("Exception Opening " + fileStatus.getPath(), 
ex.getMessage());
+          return;
         }
 
-        for( FileStatus fileStatus : reader.status ) {
-            InputStream inputStream;
-            InputStreamReader inputStreamReader;
-            BufferedReader bufferedReader;
-            if( fileStatus.isFile() && 
!fileStatus.getPath().getName().startsWith("_")) {
-                HdfsWriterConfiguration.Compression compression = 
HdfsWriterConfiguration.Compression.NONE;
-                if( fileStatus.getPath().getName().endsWith(".gz"))
-                    compression = HdfsWriterConfiguration.Compression.GZIP;
-                LOGGER.info("Started Processing: {} Encoding: {} Compression: 
{}", fileStatus.getPath().getName(), reader.hdfsConfiguration.getEncoding(), 
compression.toString());
-                try {
-                    inputStream = reader.client.open(fileStatus.getPath());
-                    if( 
compression.equals(HdfsWriterConfiguration.Compression.GZIP))
-                        inputStream = new GZIPInputStream(inputStream);
-                    inputStreamReader = new InputStreamReader(inputStream, 
reader.hdfsConfiguration.getEncoding());
-                    bufferedReader = new BufferedReader(inputStreamReader);
-                } catch (Exception e) {
-                    LOGGER.error("Exception Opening " + fileStatus.getPath(), 
e.getMessage());
-                    return;
-                }
-
-                String line = "";
-                do{
-                    try {
-                        line = bufferedReader.readLine();
-                        if( !Strings.isNullOrEmpty(line) ) {
-                            reader.countersCurrent.incrementAttempt();
-                            StreamsDatum entry = 
reader.lineReaderUtil.processLine(line);
-                            if( entry != null ) {
-                                reader.write(entry);
-                                
reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
-                            } else {
-                                LOGGER.warn("processLine failed");
-                                
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
-                            }
-                        }
-                    } catch (Exception e) {
-                        LOGGER.warn("WebHdfsPersistReader readLine Exception: 
{}", e);
-                        
reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
-                    }
-                } while( !Strings.isNullOrEmpty(line) );
-                LOGGER.info("Finished Processing " + 
fileStatus.getPath().getName());
-                try {
-                    bufferedReader.close();
-                } catch (Exception e) {
-                    LOGGER.error("WebHdfsPersistReader Exception: {}", e);
-                }
+        String line = "";
+        do {
+          try {
+            line = bufferedReader.readLine();
+            if ( !Strings.isNullOrEmpty(line) ) {
+              reader.countersCurrent.incrementAttempt();
+              StreamsDatum entry = reader.lineReaderUtil.processLine(line);
+              if ( entry != null ) {
+                reader.write(entry);
+                reader.countersCurrent.incrementStatus(DatumStatus.SUCCESS);
+              } else {
+                LOGGER.warn("processLine failed");
+                reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+              }
             }
+          } catch (Exception ex) {
+            LOGGER.warn("WebHdfsPersistReader readLine Exception: {}", ex);
+            reader.countersCurrent.incrementStatus(DatumStatus.FAIL);
+          }
         }
+        while ( !Strings.isNullOrEmpty(line) );
+        LOGGER.info("Finished Processing " + fileStatus.getPath().getName());
+        try {
+          bufferedReader.close();
+        } catch (Exception ex) {
+          LOGGER.error("WebHdfsPersistReader Exception: {}", ex);
+        }
+      }
+    }
 
-        LOGGER.info("WebHdfsPersistReaderTask Finished");
+    LOGGER.info("WebHdfsPersistReaderTask Finished");
 
-        Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS);
-    }
+    Uninterruptibles.sleepUninterruptibly(15, TimeUnit.SECONDS);
+  }
 
 }

Reply via email to