http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
index d34f53f..e356aff 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistUpdaterIT.java
@@ -18,27 +18,24 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchPersistUpdater;
-import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
@@ -53,151 +50,150 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.InputStream;
 import java.util.List;
-import java.util.Properties;
 
 import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Integration Test for
+ * @see org.apache.streams.elasticsearch.ElasticsearchPersistUpdater
  */
 public class ElasticsearchPersistUpdaterIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class);
-
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
-
-    protected ElasticsearchWriterConfiguration testConfiguration;
-    protected Client testClient;
-
-    @Before
-    public void prepareTest() throws Exception {
-
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/ElasticsearchPersistUpdaterIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
-        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
-
-        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistUpdaterIT.class);
+
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+
+  protected ElasticsearchWriterConfiguration testConfiguration;
+  protected Client testClient;
+
+  @Before
+  public void prepareTest() throws Exception {
+
+    Config reference  = ConfigFactory.load();
+    File conf_file = new 
File("target/test-classes/ElasticsearchPersistUpdaterIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+    testClient = new ElasticsearchClientManager(testConfiguration).getClient();
+
+    ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+
+    IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+    IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
+
+  }
+
+  @Test
+  public void testPersistUpdater() throws Exception {
+
+    IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+    IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    assertTrue(indicesExistsResponse.isExists());
+
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
+
+    long count = countResponse.getHits().getTotalHits();
+
+    ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
+    testPersistUpdater.prepare(null);
+
+    InputStream testActivityFolderStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
+
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      Activity update = new Activity();
+      update.setAdditionalProperty("updated", Boolean.TRUE);
+      update.setAdditionalProperty("str", "str");
+      update.setAdditionalProperty("long", 10l);
+      update.setActor(
+          new ActivityObject()
+              .withAdditionalProperty("updated", Boolean.TRUE)
+              .withAdditionalProperty("double", 10d)
+              .withAdditionalProperty("map",
+                  MAPPER.createObjectNode().set("field", 
MAPPER.createArrayNode().add("item"))));
+
+      StreamsDatum datum = new StreamsDatum(update, activity.getVerb());
+      testPersistUpdater.write( datum );
+      LOGGER.info("Updated: " + activity.getVerb() );
     }
 
-    @Test
-    public void testPersistUpdater() throws Exception {
-
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        assertTrue(indicesExistsResponse.isExists());
-
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    testPersistUpdater.cleanUp();
 
-        long count = countResponse.getHits().getTotalHits();
+    SearchRequestBuilder updatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.existsQuery("updated"));
+    SearchResponse updatedCount = updatedCountRequest.execute().actionGet();
 
-        ElasticsearchPersistUpdater testPersistUpdater = new 
ElasticsearchPersistUpdater(testConfiguration);
-        testPersistUpdater.prepare(null);
+    LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits());
 
-        InputStream testActivityFolderStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
+    assertEquals(count, updatedCount.getHits().getTotalHits());
 
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = 
ElasticsearchPersistUpdaterIT.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-            Activity update = new Activity();
-            update.setAdditionalProperty("updated", Boolean.TRUE);
-            update.setAdditionalProperty("str", "str");
-            update.setAdditionalProperty("long", 10l);
-            update.setActor(
-                    new ActivityObject()
-                    .withAdditionalProperty("updated", Boolean.TRUE)
-                    .withAdditionalProperty("double", 10d)
-                    .withAdditionalProperty("map",
-                            MAPPER.createObjectNode().set("field", 
MAPPER.createArrayNode().add("item"))));
+    SearchRequestBuilder actorUpdatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.termQuery("actor.updated", true));
+    SearchResponse actorUpdatedCount = 
actorUpdatedCountRequest.execute().actionGet();
 
-            StreamsDatum datum = new StreamsDatum(update, activity.getVerb());
-            testPersistUpdater.write( datum );
-            LOGGER.info("Updated: " + activity.getVerb() );
-        }
+    LOGGER.info("actor.updated: {}", 
actorUpdatedCount.getHits().getTotalHits());
 
-        testPersistUpdater.cleanUp();
+    assertEquals(count, actorUpdatedCount.getHits().getTotalHits());
 
-        SearchRequestBuilder updatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.existsQuery("updated"));
-        SearchResponse updatedCount = 
updatedCountRequest.execute().actionGet();
+    SearchRequestBuilder strUpdatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.termQuery("str", "str"));
+    SearchResponse strUpdatedCount = 
strUpdatedCountRequest.execute().actionGet();
 
-        LOGGER.info("updated: {}", updatedCount.getHits().getTotalHits());
+    LOGGER.info("strupdated: {}", strUpdatedCount.getHits().getTotalHits());
 
-        assertEquals(count, updatedCount.getHits().getTotalHits());
+    assertEquals(count, strUpdatedCount.getHits().getTotalHits());
 
-        SearchRequestBuilder actorUpdatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.termQuery("actor.updated", true));
-        SearchResponse actorUpdatedCount = 
actorUpdatedCountRequest.execute().actionGet();
+    SearchRequestBuilder longUpdatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
+    SearchResponse longUpdatedCount = 
longUpdatedCountRequest.execute().actionGet();
 
-        LOGGER.info("actor.updated: {}", 
actorUpdatedCount.getHits().getTotalHits());
+    LOGGER.info("longupdated: {}", longUpdatedCount.getHits().getTotalHits());
 
-        assertEquals(count, actorUpdatedCount.getHits().getTotalHits());
+    assertEquals(count, longUpdatedCount.getHits().getTotalHits());
 
-        SearchRequestBuilder strUpdatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.termQuery("str", "str"));
-        SearchResponse strUpdatedCount = 
strUpdatedCountRequest.execute().actionGet();
+    SearchRequestBuilder doubleUpdatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
+    SearchResponse doubleUpdatedCount = 
doubleUpdatedCountRequest.execute().actionGet();
 
-        LOGGER.info("strupdated: {}", 
strUpdatedCount.getHits().getTotalHits());
+    LOGGER.info("doubleupdated: {}", 
doubleUpdatedCount.getHits().getTotalHits());
 
-        assertEquals(count, strUpdatedCount.getHits().getTotalHits());
+    assertEquals(count, doubleUpdatedCount.getHits().getTotalHits());
 
-        SearchRequestBuilder longUpdatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
-        SearchResponse longUpdatedCount = 
longUpdatedCountRequest.execute().actionGet();
+    SearchRequestBuilder mapUpdatedCountRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType())
+        .setQuery(QueryBuilders.termQuery("actor.map.field", "item"));
+    SearchResponse mapUpdatedCount = 
mapUpdatedCountRequest.execute().actionGet();
 
-        LOGGER.info("longupdated: {}", 
longUpdatedCount.getHits().getTotalHits());
+    LOGGER.info("mapfieldupdated: {}", 
mapUpdatedCount.getHits().getTotalHits());
 
-        assertEquals(count, longUpdatedCount.getHits().getTotalHits());
+    assertEquals(count, mapUpdatedCount.getHits().getTotalHits());
 
-        SearchRequestBuilder doubleUpdatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.rangeQuery("long").from(9).to(11));
-        SearchResponse doubleUpdatedCount = 
doubleUpdatedCountRequest.execute().actionGet();
-
-        LOGGER.info("doubleupdated: {}", 
doubleUpdatedCount.getHits().getTotalHits());
-
-        assertEquals(count, doubleUpdatedCount.getHits().getTotalHits());
-
-        SearchRequestBuilder mapUpdatedCountRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType())
-                .setQuery(QueryBuilders.termQuery("actor.map.field", "item"));
-        SearchResponse mapUpdatedCount = 
mapUpdatedCountRequest.execute().actionGet();
-
-        LOGGER.info("mapfieldupdated: {}", 
mapUpdatedCount.getHits().getTotalHits());
-
-        assertEquals(count, mapUpdatedCount.getHits().getTotalHits());
-
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
index f291dcd..f290971 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchPersistWriterIT.java
@@ -18,21 +18,21 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
 import org.apache.streams.config.ComponentConfigurator;
-import org.apache.streams.config.StreamsConfiguration;
-import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchPersistWriter;
 import org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
@@ -58,71 +58,72 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Integration Test for
+ * @see org.apache.streams.elasticsearch.ElasticsearchPersistWriter
  */
 public class ElasticsearchPersistWriterIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriterIT.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchPersistWriterIT.class);
 
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    protected ElasticsearchWriterConfiguration testConfiguration;
-    protected Client testClient;
+  protected ElasticsearchWriterConfiguration testConfiguration;
+  protected Client testClient;
 
-    @Before
-    public void prepareTest() throws Exception {
+  @Before
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/ElasticsearchPersistWriterIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
-        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new 
File("target/test-classes/ElasticsearchPersistWriterIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(ElasticsearchWriterConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+    testClient = new ElasticsearchClientManager(testConfiguration).getClient();
 
-        ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
-        ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
-        assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
+    ClusterHealthRequest clusterHealthRequest = 
Requests.clusterHealthRequest();
+    ClusterHealthResponse clusterHealthResponse = 
testClient.admin().cluster().health(clusterHealthRequest).actionGet();
+    assertNotEquals(clusterHealthResponse.getStatus(), 
ClusterHealthStatus.RED);
 
-        IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
-        IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
-        if(indicesExistsResponse.isExists()) {
-            DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
-            DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
-            assertTrue(deleteIndexResponse.isAcknowledged());
-        };
+    IndicesExistsRequest indicesExistsRequest = 
Requests.indicesExistsRequest(testConfiguration.getIndex());
+    IndicesExistsResponse indicesExistsResponse = 
testClient.admin().indices().exists(indicesExistsRequest).actionGet();
+    if(indicesExistsResponse.isExists()) {
+      DeleteIndexRequest deleteIndexRequest = 
Requests.deleteIndexRequest(testConfiguration.getIndex());
+      DeleteIndexResponse deleteIndexResponse = 
testClient.admin().indices().delete(deleteIndexRequest).actionGet();
+      assertTrue(deleteIndexResponse.isAcknowledged());
+    };
 
-    }
+  }
 
-    @Test
-    public void testPersistWriter() throws Exception {
+  @Test
+  public void testPersistWriter() throws Exception {
 
-        ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
-        testPersistWriter.prepare(null);
+    ElasticsearchPersistWriter testPersistWriter = new 
ElasticsearchPersistWriter(testConfiguration);
+    testPersistWriter.prepare(null);
 
-        InputStream testActivityFolderStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
-               .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
+    InputStream testActivityFolderStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
 
-        for( String file : files) {
-           LOGGER.info("File: " + file );
-           InputStream testActivityFileStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
-                   .getResourceAsStream("activities/" + file);
-           Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-           StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
-           testPersistWriter.write( datum );
-           LOGGER.info("Wrote: " + activity.getVerb() );
-        }
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
ElasticsearchPersistWriterIT.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      StreamsDatum datum = new StreamsDatum(activity, activity.getVerb());
+      testPersistWriter.write( datum );
+      LOGGER.info("Wrote: " + activity.getVerb() );
+    }
 
-        testPersistWriter.cleanUp();
+    testPersistWriter.cleanUp();
 
-        SearchRequestBuilder countRequest = testClient
-                .prepareSearch(testConfiguration.getIndex())
-                .setTypes(testConfiguration.getType());
-        SearchResponse countResponse = countRequest.execute().actionGet();
+    SearchRequestBuilder countRequest = testClient
+        .prepareSearch(testConfiguration.getIndex())
+        .setTypes(testConfiguration.getType());
+    SearchResponse countResponse = countRequest.execute().actionGet();
 
-        assertEquals(89, countResponse.getHits().getTotalHits());
+    assertEquals(89, countResponse.getHits().getTotalHits());
 
-    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
index ab45cf3..76f10b1 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/TestMetadataFromDocumentProcessor.java
@@ -18,25 +18,20 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Sets;
-import org.apache.commons.io.Charsets;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.elasticsearch.processor.DocumentToMetadataProcessor;
 import 
org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
-import org.junit.Assert;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.SerializationUtils;
 import org.junit.Before;
 import org.junit.Test;
-import org.reflections.Reflections;
-import org.reflections.scanners.SubTypesScanner;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,88 +40,89 @@ import java.util.List;
 import java.util.Set;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Unit Test for
+ * @see 
org.apache.streams.elasticsearch.processor.MetadataFromDocumentProcessor
  */
 public class TestMetadataFromDocumentProcessor {
 
-    private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
+  private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TestMetadataFromDocumentProcessor.class);
 
-    @Before
-    public void prepareTest() {
+  @Before
+  public void prepareTest() {
 
-    }
+  }
 
-    @Test
-    public void testSerializability() {
-        MetadataFromDocumentProcessor processor = new 
MetadataFromDocumentProcessor();
+  @Test
+  public void testSerializability() {
+    MetadataFromDocumentProcessor processor = new 
MetadataFromDocumentProcessor();
 
-        MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) 
SerializationUtils.clone(processor);
-    }
+    MetadataFromDocumentProcessor clone = (MetadataFromDocumentProcessor) 
SerializationUtils.clone(processor);
+  }
 
-    @Test
-    public void testMetadataFromDocumentProcessor() throws Exception {
+  @Test
+  public void testMetadataFromDocumentProcessor() throws Exception {
 
-        MetadataFromDocumentProcessor processor = new 
MetadataFromDocumentProcessor();
+    MetadataFromDocumentProcessor processor = new 
MetadataFromDocumentProcessor();
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        InputStream testActivityFolderStream = 
TestMetadataFromDocumentProcessor.class.getClassLoader()
-                .getResourceAsStream("activities");
-        List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
+    InputStream testActivityFolderStream = 
TestMetadataFromDocumentProcessor.class.getClassLoader()
+        .getResourceAsStream("activities");
+    List<String> files = IOUtils.readLines(testActivityFolderStream, 
Charsets.UTF_8);
 
-        Set<ActivityObject> objects = Sets.newHashSet();
+    Set<ActivityObject> objects = Sets.newHashSet();
 
-        for( String file : files) {
-            LOGGER.info("File: " + file );
-            InputStream testActivityFileStream = 
TestMetadataFromDocumentProcessor.class.getClassLoader()
-                    .getResourceAsStream("activities/" + file);
-            Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
-            activity.setId(activity.getVerb());
-            activity.getAdditionalProperties().remove("$license");
+    for( String file : files) {
+      LOGGER.info("File: " + file );
+      InputStream testActivityFileStream = 
TestMetadataFromDocumentProcessor.class.getClassLoader()
+          .getResourceAsStream("activities/" + file);
+      Activity activity = MAPPER.readValue(testActivityFileStream, 
Activity.class);
+      activity.setId(activity.getVerb());
+      activity.getAdditionalProperties().remove("$license");
 
-            if( activity.getActor().getObjectType() != null)
-                objects.add(activity.getActor());
-            if( activity.getObject().getObjectType() != null)
-                objects.add(activity.getObject());
+      if( activity.getActor().getObjectType() != null)
+        objects.add(activity.getActor());
+      if( activity.getObject().getObjectType() != null)
+        objects.add(activity.getObject());
 
-            StreamsDatum datum = new StreamsDatum(activity);
+      StreamsDatum datum = new StreamsDatum(activity);
 
-            List<StreamsDatum> resultList = processor.process(datum);
-            assert(resultList != null);
-            assert(resultList.size() == 1);
+      List<StreamsDatum> resultList = processor.process(datum);
+      assert(resultList != null);
+      assert(resultList.size() == 1);
 
-            StreamsDatum result = resultList.get(0);
-            assert(result != null);
-            assert(result.getDocument() != null);
-            assert(result.getId() != null);
-            assert(result.getMetadata() != null);
-            assert(result.getMetadata().get("id") != null);
-            assert(result.getMetadata().get("type") != null);
+      StreamsDatum result = resultList.get(0);
+      assert(result != null);
+      assert(result.getDocument() != null);
+      assert(result.getId() != null);
+      assert(result.getMetadata() != null);
+      assert(result.getMetadata().get("id") != null);
+      assert(result.getMetadata().get("type") != null);
 
-            LOGGER.info("valid: " + activity.getVerb() );
-        }
+      LOGGER.info("valid: " + activity.getVerb() );
+    }
 
-        for( ActivityObject activityObject : objects) {
-            LOGGER.info("Object: " + 
MAPPER.writeValueAsString(activityObject));
+    for( ActivityObject activityObject : objects) {
+      LOGGER.info("Object: " + MAPPER.writeValueAsString(activityObject));
 
-            activityObject.setId(activityObject.getObjectType());
-            StreamsDatum datum = new StreamsDatum(activityObject);
+      activityObject.setId(activityObject.getObjectType());
+      StreamsDatum datum = new StreamsDatum(activityObject);
 
-            List<StreamsDatum> resultList = processor.process(datum);
-            assert(resultList != null);
-            assert(resultList.size() == 1);
+      List<StreamsDatum> resultList = processor.process(datum);
+      assert(resultList != null);
+      assert(resultList.size() == 1);
 
-            StreamsDatum result = resultList.get(0);
-            assert(result != null);
-            assert(result.getDocument() != null);
-            assert(result.getId() != null);
-            assert(result.getMetadata() != null);
-            assert(result.getMetadata().get("id") != null);
-            assert(result.getMetadata().get("type") != null);
+      StreamsDatum result = resultList.get(0);
+      assert(result != null);
+      assert(result.getDocument() != null);
+      assert(result.getId() != null);
+      assert(result.getMetadata() != null);
+      assert(result.getMetadata().get("id") != null);
+      assert(result.getMetadata().get("type") != null);
 
-            LOGGER.info("valid: " + activityObject.getObjectType() );
-        }
+      LOGGER.info("valid: " + activityObject.getObjectType() );
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
index 504ea5e..b921ba5 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistReader.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.filebuffer;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Queues;
-import com.squareup.tape.QueueFile;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistReader;
 import org.apache.streams.core.StreamsResultSet;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.squareup.tape.QueueFile;
+
 import org.joda.time.DateTime;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,135 +50,135 @@ import java.util.concurrent.Executors;
  */
 public class FileBufferPersistReader implements StreamsPersistReader, 
Serializable {
 
-    public static final String STREAMS_ID = "FileBufferPersistReader";
+  public static final String STREAMS_ID = "FileBufferPersistReader";
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistReader.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistReader.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    private FileBufferConfiguration config;
+  private FileBufferConfiguration config;
 
-    private QueueFile queueFile;
+  private QueueFile queueFile;
 
-    private boolean isStarted = false;
-    private boolean isStopped = false;
+  private boolean isStarted = false;
+  private boolean isStopped = false;
 
-    private ExecutorService executor = Executors.newSingleThreadExecutor();
+  private ExecutorService executor = Executors.newSingleThreadExecutor();
 
-    public FileBufferPersistReader() {
-        this(new ComponentConfigurator<>(FileBufferConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
-    }
+  public FileBufferPersistReader() {
+    this(new ComponentConfigurator<>(FileBufferConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
+  }
 
-    public FileBufferPersistReader(FileBufferConfiguration config) {
-        this.config = config;
-    }
+  public FileBufferPersistReader(FileBufferConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
 
-    @Override
-    public void startStream() {
-        isStarted = true;
-    }
+  @Override
+  public void startStream() {
+    isStarted = true;
+  }
 
-    @Override
-    public StreamsResultSet readCurrent() {
-
-        while (!queueFile.isEmpty()) {
-            try {
-                byte[] bytes = queueFile.peek();
-                ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-                BufferedReader buf = new BufferedReader(new 
InputStreamReader(bais));
-                String s = buf.readLine();
-                LOGGER.debug(s);
-                write(new StreamsDatum(s));
-                queueFile.remove();
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-        }
-
-        StreamsResultSet current;
-        current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
-        persistQueue.clear();
-
-        return current;
-    }
+  @Override
+  public StreamsResultSet readCurrent() {
 
-    private void write( StreamsDatum entry ) {
-        persistQueue.offer(entry);
+    while (!queueFile.isEmpty()) {
+      try {
+        byte[] bytes = queueFile.peek();
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        BufferedReader buf = new BufferedReader(new InputStreamReader(bais));
+        String line = buf.readLine();
+        LOGGER.debug(line);
+        write(new StreamsDatum(line));
+        queueFile.remove();
+      } catch (IOException ex) {
+        ex.printStackTrace();
+      }
     }
 
-    @Override
-    public StreamsResultSet readNew(BigInteger bigInteger) {
-        return null;
-    }
+    StreamsResultSet current;
+    current = new 
StreamsResultSet(Queues.newConcurrentLinkedQueue(persistQueue));
+    persistQueue.clear();
 
-    @Override
-    public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
-        return null;
-    }
+    return current;
+  }
 
-    @Override
-    public boolean isRunning() {
-        return isStarted && !isStopped;
-    }
+  private void write( StreamsDatum entry ) {
+    persistQueue.offer(entry);
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public StreamsResultSet readNew(BigInteger bigInteger) {
+    return null;
+  }
 
-        try {
-            Thread.sleep(1000);
-        } catch (InterruptedException ie) {
-            //Handle exception
-        }
+  @Override
+  public StreamsResultSet readRange(DateTime dateTime, DateTime dateTime2) {
+    return null;
+  }
 
-        mapper = new ObjectMapper();
+  @Override
+  public boolean isRunning() {
+    return isStarted && !isStopped;
+  }
 
-        File file = new File( config.getPath());
+  @Override
+  public void prepare(Object configurationObject) {
 
-        if( !file.exists() ) {
-            try {
-                file.createNewFile();
-            } catch (IOException e) {
-                LOGGER.error(e.getMessage());
-            }
-        }
+    try {
+      Thread.sleep(1000);
+    } catch (InterruptedException ie) {
+      //Handle exception
+    }
 
-        Preconditions.checkArgument(file.exists());
-        Preconditions.checkArgument(file.canRead());
+    mapper = new ObjectMapper();
 
-        try {
-            queueFile = new QueueFile(file);
-        } catch (IOException e) {
-            LOGGER.error(e.getMessage());
-        }
+    File file = new File( config.getPath());
 
-        Preconditions.checkNotNull(queueFile);
+    if ( !file.exists() ) {
+      try {
+        file.createNewFile();
+      } catch (IOException ex) {
+        LOGGER.error(ex.getMessage());
+      }
+    }
 
-        this.persistQueue = new ConcurrentLinkedQueue<>();
+    Preconditions.checkArgument(file.exists());
+    Preconditions.checkArgument(file.canRead());
 
+    try {
+      queueFile = new QueueFile(file);
+    } catch (IOException ex) {
+      LOGGER.error(ex.getMessage());
     }
 
-        @Override
-    public void cleanUp() {
-        try {
-            queueFile.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            queueFile = null;
-            isStopped = true;
-        }
+    Preconditions.checkNotNull(queueFile);
+
+    this.persistQueue = new ConcurrentLinkedQueue<>();
+
+  }
+
+  @Override
+  public void cleanUp() {
+    try {
+      queueFile.close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    } finally {
+      queueFile = null;
+      isStopped = true;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
index 4dea85c..76dfafc 100644
--- 
a/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
+++ 
b/streams-contrib/streams-persist-filebuffer/src/main/java/org/apache/streams/filebuffer/FileBufferPersistWriter.java
@@ -18,15 +18,17 @@
 
 package org.apache.streams.filebuffer;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.squareup.tape.QueueFile;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsPersistWriter;
 import org.apache.streams.util.GuidUtils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.squareup.tape.QueueFile;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,79 +43,79 @@ import java.util.concurrent.ConcurrentLinkedQueue;
  */
 public class FileBufferPersistWriter implements StreamsPersistWriter, 
Serializable {
 
-    public final static String STREAMS_ID = "FileBufferPersistWriter";
+  public static final String STREAMS_ID = "FileBufferPersistWriter";
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistWriter.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(FileBufferPersistWriter.class);
 
-    protected volatile Queue<StreamsDatum> persistQueue;
+  protected volatile Queue<StreamsDatum> persistQueue;
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    private FileBufferConfiguration config;
+  private FileBufferConfiguration config;
 
-    private QueueFile queueFile;
+  private QueueFile queueFile;
 
-    public FileBufferPersistWriter() {
-       this(new ComponentConfigurator<>(FileBufferConfiguration.class)
-         
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
-    }
+  public FileBufferPersistWriter() {
+    this(new ComponentConfigurator<>(FileBufferConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("filebuffer")));
+  }
 
-    public FileBufferPersistWriter(FileBufferConfiguration config) {
-        this.config = config;
-    }
+  public FileBufferPersistWriter(FileBufferConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public void write(StreamsDatum entry) {
+  @Override
+  public void write(StreamsDatum entry) {
 
-        String key = entry.getId() != null ? entry.getId() : 
GuidUtils.generateGuid("filewriter");
+    String key = entry.getId() != null ? entry.getId() : 
GuidUtils.generateGuid("filewriter");
 
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(key));
-        Preconditions.checkArgument(entry.getDocument() instanceof String);
-        Preconditions.checkArgument(!Strings.isNullOrEmpty((String) 
entry.getDocument()));
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(key));
+    Preconditions.checkArgument(entry.getDocument() instanceof String);
+    Preconditions.checkArgument(!Strings.isNullOrEmpty((String) 
entry.getDocument()));
 
-        byte[] item = ((String)entry.getDocument()).getBytes();
-        try {
-            queueFile.add(item);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+    byte[] item = ((String)entry.getDocument()).getBytes();
+    try {
+      queueFile.add(item);
+    } catch (IOException ex) {
+      ex.printStackTrace();
     }
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        mapper = new ObjectMapper();
+    mapper = new ObjectMapper();
 
-        File file = new File( config.getPath());
+    File file = new File( config.getPath());
 
-        try {
-            queueFile = new QueueFile(file);
-        } catch (IOException e) {
-            e.printStackTrace();
-        }
+    try {
+      queueFile = new QueueFile(file);
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    }
 
-        Preconditions.checkArgument(file.exists());
-        Preconditions.checkArgument(file.canWrite());
+    Preconditions.checkArgument(file.exists());
+    Preconditions.checkArgument(file.canWrite());
 
-        Preconditions.checkNotNull(queueFile);
+    Preconditions.checkNotNull(queueFile);
 
-        this.persistQueue  = new ConcurrentLinkedQueue<>();
+    this.persistQueue  = new ConcurrentLinkedQueue<>();
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        try {
-            queueFile.close();
-        } catch (IOException e) {
-            e.printStackTrace();
-        } finally {
-            queueFile = null;
-        }
+  @Override
+  public void cleanUp() {
+    try {
+      queueFile.close();
+    } catch (IOException ex) {
+      ex.printStackTrace();
+    } finally {
+      queueFile = null;
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
index 3c97fd7..847328a 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphHttpPersistWriter.java
@@ -18,16 +18,6 @@
 
 package org.apache.streams.graph;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import org.apache.http.HttpEntity;
-import org.apache.http.client.methods.CloseableHttpResponse;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.util.EntityUtils;
 import org.apache.streams.components.http.HttpPersistWriterConfiguration;
 import org.apache.streams.components.http.persist.SimpleHTTPPostPersistWriter;
 import org.apache.streams.config.ComponentConfigurator;
@@ -39,6 +29,18 @@ import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
 import org.apache.streams.pojo.json.Provider;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.util.EntityUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,190 +51,203 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * Adds activityobjects as vertices and activities as edges to a graph 
database with
- * an http rest endpoint (such as neo4j)
+ * an http rest endpoint (such as neo4j).
  */
 public class GraphHttpPersistWriter extends SimpleHTTPPostPersistWriter {
 
-    public static final String STREAMS_ID = 
GraphHttpPersistWriter.class.getCanonicalName();
+  public static final String STREAMS_ID = 
GraphHttpPersistWriter.class.getCanonicalName();
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GraphHttpPersistWriter.class);
-    private final static long MAX_WRITE_LATENCY = 1000;
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GraphHttpPersistWriter.class);
+  private static final long MAX_WRITE_LATENCY = 1000;
 
-    protected GraphHttpConfiguration configuration;
+  protected GraphHttpConfiguration configuration;
 
-    protected QueryGraphHelper queryGraphHelper;
-    protected HttpGraphHelper httpGraphHelper;
+  protected QueryGraphHelper queryGraphHelper;
+  protected HttpGraphHelper httpGraphHelper;
 
-    private static ObjectMapper mapper;
+  private static ObjectMapper mapper;
 
-    protected final ReadWriteLock lock = new ReentrantReadWriteLock();
+  protected final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public GraphHttpPersistWriter() {
-        this(new 
ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-    }
+  /**
+   * GraphHttpPersistWriter constructor - resolve GraphHttpConfiguration from 
JVM 'graph'.
+   */
+  public GraphHttpPersistWriter() {
+    this(new 
ComponentConfigurator<GraphHttpConfiguration>(GraphHttpConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+  }
 
-    public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
-        super(StreamsJacksonMapper.getInstance().convertValue(configuration, 
HttpPersistWriterConfiguration.class));
-        if( 
configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-            super.configuration.setResourcePath("/db/" + 
configuration.getGraph() + "/transaction/commit/");
+  /**
+   * GraphHttpPersistWriter constructor - use supplied GraphHttpConfiguration.
+   * @param configuration GraphHttpConfiguration
+   */
+  public GraphHttpPersistWriter(GraphHttpConfiguration configuration) {
+    super(StreamsJacksonMapper.getInstance().convertValue(configuration, 
HttpPersistWriterConfiguration.class));
+    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+      super.configuration.setResourcePath("/db/" + configuration.getGraph() + 
"/transaction/commit/");
+    } else if ( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
+      super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
+    }
+    this.configuration = configuration;
+  }
+
+  @Override
+  protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
+
+    Activity activity = null;
+    ActivityObject activityObject = null;
+    Object document = entry.getDocument();
+
+    if (document instanceof Activity) {
+      activity = (Activity) document;
+      activityObject = activity.getObject();
+    } else if (document instanceof ActivityObject) {
+      activityObject = (ActivityObject) document;
+    } else {
+      ObjectNode objectNode;
+      if (document instanceof ObjectNode) {
+        objectNode = (ObjectNode) document;
+      } else if ( document instanceof String) {
+        try {
+          objectNode = mapper.readValue((String) document, ObjectNode.class);
+        } catch (IOException ex) {
+          LOGGER.error("Can't handle input: ", entry);
+          throw ex;
         }
-        else if( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
-            super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
+      } else {
+        LOGGER.error("Can't handle input: ", entry);
+        throw new Exception("Can't create payload from datum.");
+      }
+
+      if ( objectNode.get("verb") != null ) {
+        try {
+          activity = mapper.convertValue(objectNode, Activity.class);
+          activityObject = activity.getObject();
+        } catch (Exception ex) {
+          activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
         }
-        this.configuration = configuration;
+      } else {
+        activityObject = mapper.convertValue(objectNode, ActivityObject.class);
+      }
     }
 
-    @Override
-    protected ObjectNode preparePayload(StreamsDatum entry) throws Exception {
-
-        Activity activity = null;
-        ActivityObject activityObject = null;
-        Object document = entry.getDocument();
-
-        if (document instanceof Activity) {
-            activity = (Activity) document;
-            activityObject = activity.getObject();
-        } else if (document instanceof ActivityObject) {
-            activityObject = (ActivityObject) document;
-        } else {
-            ObjectNode objectNode;
-            if (document instanceof ObjectNode) {
-                objectNode = (ObjectNode) document;
-            } else if( document instanceof String) {
-                try {
-                    objectNode = mapper.readValue((String) document, 
ObjectNode.class);
-                } catch (IOException e) {
-                    LOGGER.error("Can't handle input: ", entry);
-                    throw e;
-                }
-            } else {
-                LOGGER.error("Can't handle input: ", entry);
-                throw new Exception("Can't create payload from datum.");
-            }
-
-            if( objectNode.get("verb") != null ) {
-                try {
-                    activity = mapper.convertValue(objectNode, Activity.class);
-                    activityObject = activity.getObject();
-                } catch (Exception e) {
-                    activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
-                }
-            } else {
-                activityObject = mapper.convertValue(objectNode, 
ActivityObject.class);
-            }
-        }
+    Preconditions.checkArgument(activity != null || activityObject != null);
+
+    ObjectNode request = mapper.createObjectNode();
+    ArrayNode statements = mapper.createArrayNode();
+
+    // always add vertices first
 
-        Preconditions.checkArgument(activity != null || activityObject != 
null);
+    List<String> labels = Lists.newArrayList("streams");
 
-        ObjectNode request = mapper.createObjectNode();
-        ArrayNode statements = mapper.createArrayNode();
+    if ( activityObject != null ) {
+      if ( activityObject.getObjectType() != null ) {
+        labels.add(activityObject.getObjectType());
+      }
+      
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
+    }
 
-        // always add vertices first
+    if ( activity != null ) {
 
-        List<String> labels = Lists.newArrayList("streams");
+      ActivityObject actor = activity.getActor();
+      Provider provider = activity.getProvider();
 
-        if( activityObject != null ) {
-            if (activityObject.getObjectType() != null)
-                labels.add(activityObject.getObjectType());
-            
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
+      if ( provider != null
+          && !Strings.isNullOrEmpty(provider.getId()) ) {
+        labels.add(provider.getId());
+      }
+      if (actor != null
+          && !Strings.isNullOrEmpty(actor.getId())) {
+        if (actor.getObjectType() != null) {
+          labels.add(actor.getObjectType());
         }
+        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
+      }
 
-        if( activity != null ) {
-
-            ActivityObject actor = activity.getActor();
-            Provider provider = activity.getProvider();
-
-            if( provider != null &&
-                    !Strings.isNullOrEmpty(provider.getId()) ) {
-                labels.add(provider.getId());
-            }
-            if (actor != null &&
-                    !Strings.isNullOrEmpty(actor.getId())) {
-                if (actor.getObjectType() != null)
-                    labels.add(actor.getObjectType());
-                
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(actor)));
-            }
-
-            if (activityObject != null &&
-                    !Strings.isNullOrEmpty(activityObject.getId())) {
-                if (activityObject.getObjectType() != null)
-                    labels.add(activityObject.getObjectType());
-                
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
-            }
-
-            // then add edge
-
-            if (!Strings.isNullOrEmpty(activity.getVerb())) {
-                
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
-            }
+      if (activityObject != null
+          && !Strings.isNullOrEmpty(activityObject.getId())) {
+        if (activityObject.getObjectType() != null) {
+          labels.add(activityObject.getObjectType());
         }
+        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.mergeVertexRequest(activityObject)));
+      }
 
-        request.put("statements", statements);
-        return request;
+      // then add edge
 
+      if (!Strings.isNullOrEmpty(activity.getVerb())) {
+        
statements.add(httpGraphHelper.createHttpRequest(queryGraphHelper.createEdgeRequest(activity)));
+      }
     }
 
-    @Override
-    protected ObjectNode executePost(HttpPost httpPost) {
-
-        Preconditions.checkNotNull(httpPost);
-
-        ObjectNode result = null;
-
-        CloseableHttpResponse response = null;
-
-        String entityString = null;
-        try {
-            response = httpclient.execute(httpPost);
-            HttpEntity entity = response.getEntity();
-            if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 201 && entity != null) {
-                entityString = EntityUtils.toString(entity);
-                result = mapper.readValue(entityString, ObjectNode.class);
-            }
-            LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), 
response.getStatusLine().getStatusCode(), entityString);
-            if( result == null ||
-                    (
-                        result.get("errors") != null &&
-                        result.get("errors").isArray() &&
-                        result.get("errors").iterator().hasNext()
-                    )
-                ) {
-                LOGGER.error("Write Error: " + result.get("errors"));
-            } else {
-                LOGGER.debug("Write Success");
-            }
-        } catch (IOException e) {
-            LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), 
response, e.getMessage());
-        } catch (Exception e) {
-            LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), 
response, e.getMessage());
-        } finally {
-            try {
-                if( response != null) response.close();
-            } catch (IOException e) {}
+    request.put("statements", statements);
+    return request;
+
+  }
+
+  @Override
+  protected ObjectNode executePost(HttpPost httpPost) {
+
+    Preconditions.checkNotNull(httpPost);
+
+    ObjectNode result = null;
+
+    CloseableHttpResponse response = null;
+
+    String entityString = null;
+    try {
+      response = httpclient.execute(httpPost);
+      HttpEntity entity = response.getEntity();
+      if (response.getStatusLine().getStatusCode() == 200 || 
response.getStatusLine().getStatusCode() == 201 && entity != null) {
+        entityString = EntityUtils.toString(entity);
+        result = mapper.readValue(entityString, ObjectNode.class);
+      }
+      LOGGER.debug("Writer response:\n{}\n{}\n{}", httpPost.toString(), 
response.getStatusLine().getStatusCode(), entityString);
+      if ( result == null
+           || (
+              result.get("errors") != null
+                  && result.get("errors").isArray()
+                  && result.get("errors").iterator().hasNext()
+              )
+          ) {
+        LOGGER.error("Write Error: " + result.get("errors"));
+      } else {
+        LOGGER.debug("Write Success");
+      }
+    } catch (IOException ex) {
+      LOGGER.error("IO error:\n{}\n{}\n{}", httpPost.toString(), response, 
ex.getMessage());
+    } catch (Exception ex) {
+      LOGGER.error("Write Exception:\n{}\n{}\n{}", httpPost.toString(), 
response, ex.getMessage());
+    } finally {
+      try {
+        if ( response != null) {
+          response.close();
         }
-        return result;
+      } catch (IOException ignored) {
+        LOGGER.trace("ignored IOException", ignored);
+      }
     }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-
-        super.prepare(configuration);
-        mapper = StreamsJacksonMapper.getInstance();
+  @Override
+  public void prepare(Object configurationObject) {
 
-        if( 
configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
-            queryGraphHelper = new CypherQueryGraphHelper();
-            httpGraphHelper = new Neo4jHttpGraphHelper();
-        }
+    super.prepare(configuration);
+    mapper = StreamsJacksonMapper.getInstance();
 
-        Preconditions.checkNotNull(queryGraphHelper);
-        Preconditions.checkNotNull(httpGraphHelper);
+    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+      queryGraphHelper = new CypherQueryGraphHelper();
+      httpGraphHelper = new Neo4jHttpGraphHelper();
     }
 
-    @Override
-    public void cleanUp() {
+    Preconditions.checkNotNull(queryGraphHelper);
+    Preconditions.checkNotNull(httpGraphHelper);
+  }
 
-        LOGGER.info("exiting");
+  @Override
+  public void cleanUp() {
 
-    }
+    LOGGER.info("exiting");
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
index 731159f..7c6e341 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/GraphVertexReader.java
@@ -18,10 +18,6 @@
 
 package org.apache.streams.graph;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.collect.Lists;
 import org.apache.streams.components.http.HttpProviderConfiguration;
 import org.apache.streams.components.http.provider.SimpleHttpProvider;
 import org.apache.streams.config.ComponentConfigurator;
@@ -33,6 +29,12 @@ import org.apache.streams.graph.neo4j.CypherQueryResponse;
 import org.apache.streams.graph.neo4j.ItemData;
 import org.apache.streams.graph.neo4j.ItemMetadata;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.collect.Lists;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,76 +42,86 @@ import java.util.List;
 
 /**
  * Reads a stream of activityobjects from vertices in a graph database with
- * an http rest endpoint (such as neo4j)
+ * an http rest endpoint (such as neo4j).
  */
 public class GraphVertexReader extends SimpleHttpProvider implements 
StreamsPersistReader {
 
-    public static final String STREAMS_ID = 
GraphVertexReader.class.getCanonicalName();
+  public static final String STREAMS_ID = 
GraphVertexReader.class.getCanonicalName();
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(GraphVertexReader.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GraphVertexReader.class);
 
-    protected GraphReaderConfiguration configuration;
+  protected GraphReaderConfiguration configuration;
 
-    private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
+  private static ObjectMapper mapper = StreamsJacksonMapper.getInstance();
 
-    public GraphVertexReader() {
-        this(new 
ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
-    }
+  /**
+   * GraphVertexReader constructor - resolve GraphReaderConfiguration from JVM 
'graph'.
+   */
+  public GraphVertexReader() {
+    this(new 
ComponentConfigurator<GraphReaderConfiguration>(GraphReaderConfiguration.class).detectConfiguration(StreamsConfigurator.config.getConfig("graph")));
+  }
 
-    public GraphVertexReader(GraphReaderConfiguration configuration) {
-        super(mapper.convertValue(configuration, 
HttpProviderConfiguration.class));
-        if( 
configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J))
-            super.configuration.setResourcePath("/db/" + 
configuration.getGraph() + "/transaction/commit");
-        else if( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER))
-            super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
-        this.configuration = configuration;
+  /**
+   * GraphVertexReader constructor - use supplied GraphReaderConfiguration.
+   * @param configuration GraphReaderConfiguration
+   */
+  public GraphVertexReader(GraphReaderConfiguration configuration) {
+    super(mapper.convertValue(configuration, HttpProviderConfiguration.class));
+    if ( configuration.getType().equals(GraphHttpConfiguration.Type.NEO_4_J)) {
+      super.configuration.setResourcePath("/db/" + configuration.getGraph() + 
"/transaction/commit");
+    } else if ( 
configuration.getType().equals(GraphHttpConfiguration.Type.REXSTER)) {
+      super.configuration.setResourcePath("/graphs/" + 
configuration.getGraph());
     }
+    this.configuration = configuration;
+  }
 
-    /*
-     * Neo API query returns something like this:
-     * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { 
"data": { props }, etc... } ] ] }
-     *
-     */
-    public List<ObjectNode> parse(JsonNode jsonNode) {
-        List<ObjectNode> results = Lists.newArrayList();
+  /**
+   * Neo API query returns something like this:
+   * { "columns": [ "v" ], "data": [ [ { "data": { props }, etc... } ], [ { 
"data": { props }, etc... } ] ] }
+   *
+   * @param jsonNode jsonNode
+   * @return result
+   */
+  public List<ObjectNode> parse(JsonNode jsonNode) {
+    List<ObjectNode> results = Lists.newArrayList();
 
-        ObjectNode root = (ObjectNode) jsonNode;
+    ObjectNode root = (ObjectNode) jsonNode;
 
-        CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, 
CypherQueryResponse.class);
+    CypherQueryResponse cypherQueryResponse = mapper.convertValue(root, 
CypherQueryResponse.class);
 
-        for( List<List<ItemMetadata>> dataWrapper : 
cypherQueryResponse.getData()) {
+    for ( List<List<ItemMetadata>> dataWrapper : 
cypherQueryResponse.getData()) {
 
-            for (List<ItemMetadata> itemMetadatas : dataWrapper) {
+      for (List<ItemMetadata> itemMetadatas : dataWrapper) {
 
-                for (ItemMetadata itemMetadata : itemMetadatas) {
+        for (ItemMetadata itemMetadata : itemMetadatas) {
 
-                    ItemData itemData = itemMetadata.getData();
+          ItemData itemData = itemMetadata.getData();
 
-                    LOGGER.debug("itemData: " + itemData);
+          LOGGER.debug("itemData: " + itemData);
 
-                    
results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
-                }
+          
results.add(PropertyUtil.unflattenMap(itemData.getAdditionalProperties(), '.'));
+        }
 
-            }
+      }
 
-        }
-        return results;
     }
+    return results;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
+  @Override
+  public void prepare(Object configurationObject) {
 
-        super.prepare(configurationObject);
+    super.prepare(configurationObject);
 
-    }
+  }
 
-    @Override
-    public StreamsResultSet readAll() {
-        return readCurrent();
-    }
+  @Override
+  public StreamsResultSet readAll() {
+    return readCurrent();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
index 0833ba0..17b8840 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/HttpGraphHelper.java
@@ -19,8 +19,7 @@
 package org.apache.streams.graph;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
+
 import org.javatuples.Pair;
 
 import java.util.Map;
@@ -31,6 +30,6 @@ import java.util.Map;
  */
 public interface HttpGraphHelper {
 
-    public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters);
+  public ObjectNode createHttpRequest(Pair<String, Map<String, Object>> 
queryPlusParameters);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
index eeacdae..1699aee 100644
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
+++ 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/QueryGraphHelper.java
@@ -18,28 +18,27 @@
 
 package org.apache.streams.graph;
 
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import org.apache.streams.pojo.json.Activity;
 import org.apache.streams.pojo.json.ActivityObject;
+
 import org.javatuples.Pair;
 
 import java.util.Map;
 
 /**
  * Interface for methods allowing persistance to a graph database which uses a 
combination
- * DSL
+ * DSL.
  */
 public interface QueryGraphHelper {
 
-    public Pair<String, Map<String, Object>> getVertexRequest(String 
streamsId);
+  public Pair<String, Map<String, Object>> getVertexRequest(String streamsId);
 
-    public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId);
+  public Pair<String, Map<String, Object>> getVertexRequest(Long vertexId);
 
-    public Pair<String, Map<String, Object>> 
createVertexRequest(ActivityObject activityObject);
+  public Pair<String, Map<String, Object>> createVertexRequest(ActivityObject 
activityObject);
 
-    public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject);
+  public Pair<String, Map<String, Object>> mergeVertexRequest(ActivityObject 
activityObject);
 
-    public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity);
+  public Pair<String, Map<String, Object>> createEdgeRequest(Activity 
activity);
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
deleted file mode 100644
index 3dc8ffc..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/BinaryGraphHelper.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.graph.QueryGraphHelper;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.javatuples.Pair;
-import org.javatuples.Quartet;
-import org.stringtemplate.v4.ST;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Supporting class for interacting with neo4j via rest API
- */
-public class BinaryGraphHelper {
-
-    private final static ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
-
-    public Pair<String, Map<String, Object>> 
createVertexRequest(ActivityObject activityObject) {
-
-        Preconditions.checkNotNull(activityObject.getObjectType());
-
-        ObjectNode object = mapper.convertValue(activityObject, 
ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-        Pair<String, Map<String, Object>> queryPlusParameters = new 
Pair(props.get("id"), props);
-
-        return queryPlusParameters;
-    }
-
-    public Quartet<String, String, String, Map<String, Object>> 
createEdgeRequest(Activity activity) {
-
-        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-        Quartet createEdgeRequest = new Quartet(
-                activity.getActor().getId(),
-                activity.getObject().getId(),
-                activity.getId(),
-                props);
-
-        return createEdgeRequest;
-    }
-
-    public static String getPropertyValueSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
-    }
-
-    public static String getPropertyParamSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
-    }
-
-    public static String getPropertyCreater(Map<String, Object> map) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        List<String> parts = Lists.newArrayList();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String) (entry.getValue());
-                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
-            }
-        }
-        builder.append(Joiner.on(",").join(parts));
-        builder.append("}");
-        return builder.toString();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
 
b/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
deleted file mode 100644
index 8028350..0000000
--- 
a/streams-contrib/streams-persist-graph/src/main/java/org/apache/streams/graph/neo4j/CypherGraphHelper.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.streams.graph.neo4j;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import org.apache.streams.data.util.PropertyUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
-import org.apache.streams.pojo.json.Activity;
-import org.apache.streams.pojo.json.ActivityObject;
-import org.stringtemplate.v4.ST;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Supporting class for interacting with neo4j via rest API
- */
-public class CypherGraphHelper implements org.apache.streams.graph.GraphHelper 
{
-
-    private final static ObjectMapper mapper = 
StreamsJacksonMapper.getInstance();
-
-    public final static String statementKey = "statement";
-    public final static String paramsKey = "parameters";
-    public final static String propsKey = "props";
-
-    public final static String getVertexLongIdStatementTemplate = "MATCH (v) 
WHERE ID(v) = <id> RETURN v";
-    public final static String getVertexStringIdStatementTemplate = "MATCH (v 
{id: '<id>'} ) RETURN v";
-
-    public final static String createVertexStatementTemplate = "MATCH (x {id: 
'<id>'}) "+
-                                                                "CREATE UNIQUE 
(v:<type> { props }) "+
-                                                                "ON CREATE SET 
v <labels> "+
-                                                                "RETURN v";
-
-    public final static String mergeVertexStatementTemplate = "MERGE (v:<type> 
{id: '<id>'}) "+
-                                                               "ON CREATE SET 
v <labels>, v = { props }, v.`@timestamp` = timestamp() "+
-                                                               "ON MATCH SET v 
<labels>, v = { props }, v.`@timestamp` = timestamp() "+
-                                                               "RETURN v";
-
-    public final static String createEdgeStatementTemplate = "MATCH 
(s:<s_type> {id: '<s_id>'}),(d:<d_type> {id: '<d_id>'}) "+
-                                                            "CREATE UNIQUE 
(s)-[r:<r_type> <r_props>]->(d) "+
-                                                            "RETURN r";
-
-    public ObjectNode getVertexRequest(String streamsId) {
-
-        ObjectNode request = mapper.createObjectNode();
-
-        ST getVertex = new ST(getVertexStringIdStatementTemplate);
-        getVertex.add("id", streamsId);
-        request.put(statementKey, getVertex.render());
-
-        return request;
-    }
-
-    @Override
-    public ObjectNode getVertexRequest(Long vertexId) {
-
-        ObjectNode request = mapper.createObjectNode();
-
-        ST getVertex = new ST(getVertexLongIdStatementTemplate);
-        getVertex.add("id", vertexId);
-        request.put(statementKey, getVertex.render());
-
-        return request;
-    }
-
-    public ObjectNode createVertexRequest(ActivityObject activityObject) {
-
-        Preconditions.checkNotNull(activityObject.getObjectType());
-
-        ObjectNode request = mapper.createObjectNode();
-
-        List<String> labels = Lists.newArrayList();
-        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
-            List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
-            for( String extraLabel : extraLabels )
-                labels.add(":"+extraLabel);
-        }
-
-        ST createVertex = new ST(createVertexStatementTemplate);
-        createVertex.add("id", activityObject.getId());
-        createVertex.add("type", activityObject.getObjectType());
-        createVertex.add("labels", Joiner.on(' ').join(labels));
-        request.put(statementKey, createVertex.render());
-
-        ObjectNode params = mapper.createObjectNode();
-        ObjectNode object = mapper.convertValue(activityObject, 
ObjectNode.class);
-        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.');
-        params.put(propsKey, props);
-        request.put(paramsKey, params);
-
-        return request;
-    }
-
-    public ObjectNode mergeVertexRequest(ActivityObject activityObject) {
-
-        Preconditions.checkNotNull(activityObject.getObjectType());
-
-        ObjectNode request = mapper.createObjectNode();
-
-        List<String> labels = Lists.newArrayList();
-        if( activityObject.getAdditionalProperties().containsKey("labels") ) {
-            List<String> extraLabels = 
(List<String>)activityObject.getAdditionalProperties().get("labels");
-            for( String extraLabel : extraLabels )
-                labels.add(":"+extraLabel);
-        }
-
-        ST mergeVertex = new ST(mergeVertexStatementTemplate);
-        mergeVertex.add("id", activityObject.getId());
-        mergeVertex.add("type", activityObject.getObjectType());
-        mergeVertex.add("labels", Joiner.on(' ').join(labels));
-
-        ObjectNode params = mapper.createObjectNode();
-        ObjectNode object = mapper.convertValue(activityObject, 
ObjectNode.class);
-        ObjectNode props = PropertyUtil.flattenToObjectNode(object, '.');
-        params.put(propsKey, props);
-        request.put(paramsKey, params);
-
-        String statement = mergeVertex.render();
-
-        request.put(statementKey, statement);
-
-        return request;
-    }
-
-    public ObjectNode createEdgeRequest(Activity activity, ActivityObject 
source, ActivityObject destination) {
-
-        ObjectNode request = mapper.createObjectNode();
-
-        // set the activityObject's and extensions null, because their 
properties don't need to appear on the relationship
-        activity.setActor(null);
-        activity.setObject(null);
-        activity.setTarget(null);
-        activity.getAdditionalProperties().put("extensions", null);
-
-        ObjectNode object = mapper.convertValue(activity, ObjectNode.class);
-        Map<String, Object> props = PropertyUtil.flattenToMap(object, '.');
-
-        ST mergeEdge = new ST(createEdgeStatementTemplate);
-        mergeEdge.add("s_id", source.getId());
-        mergeEdge.add("s_type", source.getObjectType());
-        mergeEdge.add("d_id", destination.getId());
-        mergeEdge.add("d_type", destination.getObjectType());
-        mergeEdge.add("r_id", activity.getId());
-        mergeEdge.add("r_type", activity.getVerb());
-        mergeEdge.add("r_props", getPropertyCreater(props));
-
-        String statement = mergeEdge.render();
-        request.put(statementKey, statement);
-
-        return request;
-    }
-
-    public static String getPropertyValueSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
-    }
-
-    public static String getPropertyParamSetter(Map<String, Object> map, 
String symbol) {
-        StringBuilder builder = new StringBuilder();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String)(entry.getValue());
-                builder.append("," + symbol + ".`" + entry.getKey() + "` = '" 
+ propVal + "'");
-            }
-        }
-        return builder.toString();
-    }
-
-    public static String getPropertyCreater(Map<String, Object> map) {
-        StringBuilder builder = new StringBuilder();
-        builder.append("{");
-        List<String> parts = Lists.newArrayList();
-        for( Map.Entry<String, Object> entry : map.entrySet()) {
-            if( entry.getValue() instanceof String ) {
-                String propVal = (String) (entry.getValue());
-                parts.add("`"+entry.getKey() + "`:'" + propVal + "'");
-            }
-        }
-        builder.append(Joiner.on(",").join(parts));
-        builder.append("}");
-        return builder.toString();
-    }
-
-}


Reply via email to