http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
index 3bb4b97..06a6dc8 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/ElasticsearchQuery.java
@@ -18,16 +18,17 @@
 
 package org.apache.streams.elasticsearch;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.SearchType;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.script.Script;
-import org.elasticsearch.search.Scroll;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.sort.SortBuilders;
 import org.slf4j.Logger;
@@ -38,190 +39,209 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+/**
+ * Helper for building, querying, and paging an elasticsearch query.
+ */
 public class ElasticsearchQuery implements Iterable<SearchHit>, 
Iterator<SearchHit>, Serializable {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchQuery.class);
-    private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
-
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchReaderConfiguration config;
-    private List<String> indexes = new ArrayList<>();
-    private List<String> types = new ArrayList<>();
-    private int limit = 1000 * 1000 * 1000; // we are going to set the default 
limit very high to 1bil
-    private int batchSize = 100;
-    private String scrollTimeout = "5m";
-    private org.elasticsearch.index.query.QueryBuilder queryBuilder;
-    private SearchRequestBuilder search;
-    private SearchResponse scrollResp;
-    private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
-    private SearchHit next = null;
-    private long totalHits = 0;
-    private long totalRead = 0;
-
-    private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
-
-    public ElasticsearchQuery() {
-        this(new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
-    }
-
-    public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
-        this.config = config;
-        this.elasticsearchClientManager = new 
ElasticsearchClientManager(config);
-        this.indexes.addAll(config.getIndexes());
-        this.types.addAll(config.getTypes());
-        this.scrollTimeout = config.getScrollTimeout();
-    }
-
-    public long getHitCount() {
-        return this.search == null ? 0 : this.totalHits;
-    }
-
-    public long getReadCount() {
-        return this.totalRead;
-    }
-
-    public double getReadPercent() {
-        return (double) this.getReadCount() / (double) this.getHitCount();
-    }
-
-    public long getRemainingCount() {
-        return this.totalRead - this.totalHits;
-    }
-
-    public void setBatchSize(int batchSize) {
-        this.batchSize = batchSize;
-    }
-
-    public void setScrollTimeout(String scrollTimeout) {
-        this.scrollTimeout = scrollTimeout;
-    }
-
-    public void setQueryBuilder(QueryBuilder queryBuilder) {
-        this.queryBuilder = queryBuilder;
-    }
-
-    public void execute(Object o) {
-
-        // If we haven't already set up the search, then set up the search.
-        if (search == null) {
-
-            search = elasticsearchClientManager.getClient()
-                    .prepareSearch(indexes.toArray(new String[0]))
-                    .setSearchType(SearchType.SCAN)
-                    .setExplain(true)
-                    .addField("*")
-                    .setFetchSource(true)
-                    .setSize(batchSize)
-                    .setScroll(scrollTimeout)
-                    .addField("_timestamp");
-
-            LOGGER.debug("Search source: " + search.toString());
-
-            String searchJson;
-            if( config.getSearch() != null ) {
-                LOGGER.debug("Have config in Reader: " + 
config.getSearch().toString());
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchQuery.class);
+  private static final int SCROLL_POSITION_NOT_INITIALIZED = -3;
+
+  private ElasticsearchClientManager elasticsearchClientManager;
+  private ElasticsearchReaderConfiguration config;
+  private List<String> indexes = new ArrayList<>();
+  private List<String> types = new ArrayList<>();
+  private int limit = 1000 * 1000 * 1000; // we are going to set the default 
limit very high to 1bil
+  private int batchSize = 100;
+  private String scrollTimeout = "5m";
+  private org.elasticsearch.index.query.QueryBuilder queryBuilder;
+  private SearchRequestBuilder search;
+  private SearchResponse scrollResp;
+  private int scrollPositionInScroll = SCROLL_POSITION_NOT_INITIALIZED;
+  private SearchHit next = null;
+  private long totalHits = 0;
+  private long totalRead = 0;
+
+  private StreamsJacksonMapper mapper = StreamsJacksonMapper.getInstance();
+
+  /**
+   * ElasticsearchQuery constructor - resolves 
ElasticsearchReaderConfiguration from JVM 'elasticsearch'.
+   */
+  public ElasticsearchQuery() {
+    this(new ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch")));
+  }
+
+  /**
+   * ElasticsearchQuery constructor - uses provided 
ElasticsearchReaderConfiguration.
+   */
+  public ElasticsearchQuery(ElasticsearchReaderConfiguration config) {
+    this.config = config;
+    this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+    this.indexes.addAll(config.getIndexes());
+    this.types.addAll(config.getTypes());
+    this.scrollTimeout = config.getScrollTimeout();
+  }
+
+  public long getHitCount() {
+    return this.search == null ? 0 : this.totalHits;
+  }
+
+  public long getReadCount() {
+    return this.totalRead;
+  }
+
+  public double getReadPercent() {
+    return (double) this.getReadCount() / (double) this.getHitCount();
+  }
+
+  public long getRemainingCount() {
+    return this.totalRead - this.totalHits;
+  }
+
+  public void setBatchSize(int batchSize) {
+    this.batchSize = batchSize;
+  }
+
+  public void setScrollTimeout(String scrollTimeout) {
+    this.scrollTimeout = scrollTimeout;
+  }
+
+  public void setQueryBuilder(QueryBuilder queryBuilder) {
+    this.queryBuilder = queryBuilder;
+  }
+
+  /**
+   * execute ElasticsearchQuery.
+   * @param obj deprecated
+   */
+  public void execute(Object obj) {
+
+    // If we haven't already set up the search, then set up the search.
+    if (search == null) {
+
+      search = elasticsearchClientManager.getClient()
+          .prepareSearch(indexes.toArray(new String[0]))
+          .setSearchType(SearchType.SCAN)
+          .setExplain(true)
+          .addField("*")
+          .setFetchSource(true)
+          .setSize(batchSize)
+          .setScroll(scrollTimeout)
+          .addField("_timestamp");
+
+      LOGGER.debug("Search source: " + search.toString());
+
+      String searchJson;
+      if ( config.getSearch() != null ) {
+        LOGGER.debug("Have config in Reader: " + 
config.getSearch().toString());
 
-                try {
-                    searchJson = mapper.writeValueAsString(config.getSearch());
-                    LOGGER.debug("Extra source: " + searchJson);
-                    search = search.setExtraSource(searchJson);
-
-                } catch (JsonProcessingException e) {
-                    LOGGER.warn("Could not apply _search supplied by config", 
e.getMessage());
-                }
-
-
-            }
-
-            LOGGER.debug("Final Search: " + 
search.internalBuilder().toString());
-
-            if (this.queryBuilder != null)
-                search = search.setQuery(this.queryBuilder);
-
-            // If the types are null, then don't specify a type
-            if (this.types != null && this.types.size() > 0)
-                search = search.setTypes(types.toArray(new String[0]));
-
-            // TODO: Replace when all clusters are upgraded past 0.90.4 so we 
can implement a RANDOM scroll.
-            boolean random = false;
-            if (random)
-                search = search.addSort(SortBuilders.scriptSort(new 
Script("random()"), "number"));
-        }
-
-        // We don't have a scroll, we need to create a scroll
-        if (scrollResp == null) {
-            scrollResp = search.execute().actionGet();
-            LOGGER.trace(search.toString());
-        }
-    }
-
-    //Iterable methods
-    @Override
-    public Iterator<SearchHit> iterator() {
-        return this;
-    }
-
-    //Iterator methods
-    @Override
-    public SearchHit next() {
-        return this.next;
-    }
-
-    @Override
-    public boolean hasNext() {
-        calcNext();
-        return hasRecords();
-    }
-
-    public void calcNext() {
         try {
-            // We have exhausted our scroll create another scroll.
-            if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || 
scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
-                // reset the scroll position
-                scrollPositionInScroll = 0;
-
-                // get the next hits of the scroll
-                scrollResp = elasticsearchClientManager.getClient()
-                        .prepareSearchScroll(scrollResp.getScrollId())
-                        .setScroll(scrollTimeout)
-                        .execute()
-                        .actionGet();
-
-                this.totalHits = scrollResp.getHits().getTotalHits();
-            }
-
-            // If this scroll has 0 items then we set the scroll position to -1
-            // letting the iterator know that we are done.
-            if (scrollResp.getHits().getTotalHits() == 0 || 
scrollResp.getHits().getHits().length == 0)
-                scrollPositionInScroll = -1;
-            else {
-                // get the next record
-                next = scrollResp.getHits().getAt(scrollPositionInScroll);
-
-                // Increment our counters
-                scrollPositionInScroll += 1;
-                totalRead += 1;
-            }
-        } catch (Exception e) {
-            LOGGER.error("Unexpected scrolling error: {}", e.getMessage());
-            scrollPositionInScroll = -1;
-            next = null;
-        }
-    }
+          searchJson = mapper.writeValueAsString(config.getSearch());
+          LOGGER.debug("Extra source: " + searchJson);
+          search = search.setExtraSource(searchJson);
 
-    public void remove() {
-    }
-
-    public void cleanUp() {
-    }
+        } catch (JsonProcessingException ex) {
+          LOGGER.warn("Could not apply _search supplied by config", 
ex.getMessage());
+        }
 
-    protected boolean isCompleted() {
-        return totalRead >= this.limit && hasRecords();
-    }
 
-    protected boolean hasRecords() {
-        return scrollPositionInScroll != -1 && (!(this.totalRead > 
this.limit));
-    }
+      }
+
+      LOGGER.debug("Final Search: " + search.internalBuilder().toString());
+
+      if (this.queryBuilder != null) {
+        search = search.setQuery(this.queryBuilder);
+      }
+
+      // If the types are null, then don't specify a type
+      if (this.types != null && this.types.size() > 0) {
+        search = search.setTypes(types.toArray(new String[0]));
+      }
+
+      // TODO: Replace when all clusters are upgraded past 0.90.4 so we can 
implement a RANDOM scroll.
+      boolean random = false;
+      if (random) {
+        search = search.addSort(SortBuilders.scriptSort(new 
Script("random()"), "number"));
+      }
+    }
+
+    // We don't have a scroll, we need to create a scroll
+    if (scrollResp == null) {
+      scrollResp = search.execute().actionGet();
+      LOGGER.trace(search.toString());
+    }
+  }
+
+  //Iterable methods
+  @Override
+  public Iterator<SearchHit> iterator() {
+    return this;
+  }
+
+  //Iterator methods
+  @Override
+  public SearchHit next() {
+    return this.next;
+  }
+
+  @Override
+  public boolean hasNext() {
+    calcNext();
+    return hasRecords();
+  }
+
+  /**
+   * shift to next page of scroll.
+   */
+  public void calcNext() {
+    try {
+      // We have exhausted our scroll create another scroll.
+      if (scrollPositionInScroll == SCROLL_POSITION_NOT_INITIALIZED || 
scrollPositionInScroll >= scrollResp.getHits().getHits().length) {
+        // reset the scroll position
+        scrollPositionInScroll = 0;
+
+        // get the next hits of the scroll
+        scrollResp = elasticsearchClientManager.getClient()
+            .prepareSearchScroll(scrollResp.getScrollId())
+            .setScroll(scrollTimeout)
+            .execute()
+            .actionGet();
+
+        this.totalHits = scrollResp.getHits().getTotalHits();
+      }
+
+      // If this scroll has 0 items then we set the scroll position to -1
+      // letting the iterator know that we are done.
+      if (scrollResp.getHits().getTotalHits() == 0 || 
scrollResp.getHits().getHits().length == 0) {
+        scrollPositionInScroll = -1;
+      } else {
+        // get the next record
+        next = scrollResp.getHits().getAt(scrollPositionInScroll);
+
+        // Increment our counters
+        scrollPositionInScroll += 1;
+        totalRead += 1;
+      }
+    } catch (Exception ex) {
+      LOGGER.error("Unexpected scrolling error: {}", ex.getMessage());
+      scrollPositionInScroll = -1;
+      next = null;
+    }
+  }
+
+  public void remove() {
+  }
+
+  public void cleanUp() {
+  }
+
+  protected boolean isCompleted() {
+    return totalRead >= this.limit && hasRecords();
+  }
+
+  protected boolean hasRecords() {
+    return scrollPositionInScroll != -1 && (!(this.totalRead > this.limit));
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
index 26012ef..6ce15d4 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataAsDocumentProcessor.java
@@ -18,15 +18,6 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
-import com.typesafe.config.Config;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -35,90 +26,104 @@ import 
org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+import com.typesafe.config.Config;
+
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;
 
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 /**
- * Uses index and type in metadata map stored in datum document to populate 
current document into datums
+ * Uses index and type in metadata map stored in datum document to populate 
current document into datums.
  */
 public class DatumFromMetadataAsDocumentProcessor implements StreamsProcessor, 
Serializable {
 
-    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
+  private static final String STREAMS_ID = "DatumFromMetadataProcessor";
 
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchReaderConfiguration config;
+  private ElasticsearchClientManager elasticsearchClientManager;
+  private ElasticsearchReaderConfiguration config;
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    public DatumFromMetadataAsDocumentProcessor() {
-        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
-    }
+  public DatumFromMetadataAsDocumentProcessor() {
+    this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
+  }
 
-    public DatumFromMetadataAsDocumentProcessor(Config config) {
-        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
-    }
+  public DatumFromMetadataAsDocumentProcessor(Config config) {
+    this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
+  }
 
-    public 
DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration config) {
-        this.config = config;
-    }
+  public DatumFromMetadataAsDocumentProcessor(ElasticsearchReaderConfiguration 
config) {
+    this.config = config;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = new ArrayList<>();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    List<StreamsDatum> result = new ArrayList<>();
 
-        ObjectNode metadataObjectNode;
-        try {
-            metadataObjectNode = mapper.readValue((String) 
entry.getDocument(), ObjectNode.class);
-        } catch (IOException e) {
-            return result;
-        }
+    ObjectNode metadataObjectNode;
+    try {
+      metadataObjectNode = mapper.readValue((String) entry.getDocument(), 
ObjectNode.class);
+    } catch (IOException ex) {
+      return result;
+    }
 
-        Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
+    Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
-        if(entry.getMetadata() == null)
-            return result;
+    if (entry.getMetadata() == null) {
+      return result;
+    }
 
-        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
-        String type = ElasticsearchMetadataUtil.getType(metadata, config);
-        String id = ElasticsearchMetadataUtil.getId(metadata);
+    String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+    String type = ElasticsearchMetadataUtil.getType(metadata, config);
+    String id = ElasticsearchMetadataUtil.getId(metadata);
 
-        GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
-        getRequestBuilder.setFields("*", "_timestamp");
-        getRequestBuilder.setFetchSource(true);
-        GetResponse getResponse = getRequestBuilder.get();
+    GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
+    getRequestBuilder.setFields("*", "_timestamp");
+    getRequestBuilder.setFetchSource(true);
+    GetResponse getResponse = getRequestBuilder.get();
 
-        if( getResponse == null || !getResponse.isExists() || 
getResponse.isSourceEmpty())
-            return result;
+    if ( getResponse == null || !getResponse.isExists() || 
getResponse.isSourceEmpty()) {
+      return result;
+    }
 
-        entry.setDocument(getResponse.getSource());
-        if( getResponse.getField("_timestamp") != null) {
-            DateTime timestamp = new DateTime(((Long) 
getResponse.getField("_timestamp").getValue()).longValue());
-            entry.setTimestamp(timestamp);
-        }
+    entry.setDocument(getResponse.getSource());
+    if ( getResponse.getField("_timestamp") != null) {
+      DateTime timestamp = new DateTime(((Long) 
getResponse.getField("_timestamp").getValue()).longValue());
+      entry.setTimestamp(timestamp);
+    }
 
-        result.add(entry);
+    result.add(entry);
 
-        return result;
-    }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        this.elasticsearchClientManager = new 
ElasticsearchClientManager(config);
-        mapper = StreamsJacksonMapper.getInstance();
-        mapper.registerModule(new JsonOrgModule());
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    this.elasticsearchClientManager = new ElasticsearchClientManager(config);
+    mapper = StreamsJacksonMapper.getInstance();
+    mapper.registerModule(new JsonOrgModule());
+  }
 
-    @Override
-    public void cleanUp() {
-        this.elasticsearchClientManager.getClient().close();
-    }
+  @Override
+  public void cleanUp() {
+    this.elasticsearchClientManager.getClient().close();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
index 7897e8d..bef190e 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DatumFromMetadataProcessor.java
@@ -18,7 +18,6 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.typesafe.config.Config;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.config.StreamsConfigurator;
 import org.apache.streams.core.StreamsDatum;
@@ -26,6 +25,9 @@ import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
+
+import com.typesafe.config.Config;
+
 import org.elasticsearch.action.get.GetRequestBuilder;
 import org.elasticsearch.action.get.GetResponse;
 import org.joda.time.DateTime;
@@ -36,74 +38,76 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Uses index and type in metadata to populate current document into datums
+ * Uses index and type in metadata to populate current document into datums.
  */
 public class DatumFromMetadataProcessor implements StreamsProcessor, 
Serializable {
 
-    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
+  private static final String STREAMS_ID = "DatumFromMetadataProcessor";
 
-    private ElasticsearchClientManager elasticsearchClientManager;
-    private ElasticsearchReaderConfiguration config;
+  private ElasticsearchClientManager elasticsearchClientManager;
+  private ElasticsearchReaderConfiguration config;
 
-    public DatumFromMetadataProcessor() {
-        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
-    }
+  public DatumFromMetadataProcessor() {
+    this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
+  }
 
-    public DatumFromMetadataProcessor(Config config) {
-        this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
-          
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
-    }
+  public DatumFromMetadataProcessor(Config config) {
+    this.config = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class)
+        
.detectConfiguration(StreamsConfigurator.getConfig().getConfig("elasticsearch"));
+  }
 
-    public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) 
{
-        this.config = config;
-    }
+  public DatumFromMetadataProcessor(ElasticsearchReaderConfiguration config) {
+    this.config = config;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = new ArrayList<>();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    List<StreamsDatum> result = new ArrayList<>();
 
-        if(entry == null || entry.getMetadata() == null)
-            return result;
+    if (entry == null || entry.getMetadata() == null) {
+      return result;
+    }
 
-        Map<String, Object> metadata = entry.getMetadata();
+    Map<String, Object> metadata = entry.getMetadata();
 
-        String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
-        String type = ElasticsearchMetadataUtil.getType(metadata, config);
-        String id = ElasticsearchMetadataUtil.getId(entry);
+    String index = ElasticsearchMetadataUtil.getIndex(metadata, config);
+    String type = ElasticsearchMetadataUtil.getType(metadata, config);
+    String id = ElasticsearchMetadataUtil.getId(entry);
 
-        GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
-        getRequestBuilder.setFields("*", "_timestamp");
-        getRequestBuilder.setFetchSource(true);
-        GetResponse getResponse = getRequestBuilder.get();
+    GetRequestBuilder getRequestBuilder = 
elasticsearchClientManager.getClient().prepareGet(index, type, id);
+    getRequestBuilder.setFields("*", "_timestamp");
+    getRequestBuilder.setFetchSource(true);
+    GetResponse getResponse = getRequestBuilder.get();
 
-        if( getResponse == null || !getResponse.isExists() || 
getResponse.isSourceEmpty() )
-            return result;
+    if ( getResponse == null || !getResponse.isExists() || 
getResponse.isSourceEmpty() ) {
+      return result;
+    }
 
-        entry.setDocument(getResponse.getSource());
-        if( getResponse.getField("_timestamp") != null) {
-            DateTime timestamp = new DateTime(((Long) 
getResponse.getField("_timestamp").getValue()).longValue());
-            entry.setTimestamp(timestamp);
-        }
+    entry.setDocument(getResponse.getSource());
+    if ( getResponse.getField("_timestamp") != null) {
+      DateTime timestamp = new DateTime(((Long) 
getResponse.getField("_timestamp").getValue()).longValue());
+      entry.setTimestamp(timestamp);
+    }
 
-        result.add(entry);
+    result.add(entry);
 
-        return result;
-    }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        this.elasticsearchClientManager = new 
ElasticsearchClientManager(config);
+  @Override
+  public void prepare(Object configurationObject) {
+    this.elasticsearchClientManager = new ElasticsearchClientManager(config);
 
-    }
+  }
 
-    @Override
-    public void cleanUp() {
-        this.elasticsearchClientManager.getClient().close();
-    }
+  @Override
+  public void cleanUp() {
+    this.elasticsearchClientManager.getClient().close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
index 9a08654..2a64fbc 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/DocumentToMetadataProcessor.java
@@ -18,13 +18,15 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
 import org.apache.streams.jackson.StreamsJacksonMapper;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,60 +38,62 @@ import java.util.Map;
 /**
  * Moves a json representation of metadata out of the document to the metadata 
field.
  *
+ * <p/>
  * This is useful if you have a list of document metadata references in the 
document field,
  * for example loaded from a file, and need them in the metadata field.
  */
 public class DocumentToMetadataProcessor implements StreamsProcessor, 
Serializable {
 
-    private final static String STREAMS_ID = "DatumFromMetadataProcessor";
+  private static final String STREAMS_ID = "DatumFromMetadataProcessor";
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(DocumentToMetadataProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(DocumentToMetadataProcessor.class);
 
-    public DocumentToMetadataProcessor() {
-    }
+  public DocumentToMetadataProcessor() {
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-        List<StreamsDatum> result = new ArrayList<>();
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+    List<StreamsDatum> result = new ArrayList<>();
 
-        Object object = entry.getDocument();
-        ObjectNode metadataObjectNode;
-        try {
-            String docAsJson = (object instanceof String) ? object.toString() 
: mapper.writeValueAsString(object);
-            metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class);
-        } catch (Throwable e) {
-            LOGGER.warn("Exception: %s", e.getMessage());
-            return result;
-        }
+    Object object = entry.getDocument();
+    ObjectNode metadataObjectNode;
+    try {
+      String docAsJson = (object instanceof String) ? object.toString() : 
mapper.writeValueAsString(object);
+      metadataObjectNode = mapper.readValue(docAsJson, ObjectNode.class);
+    } catch (Throwable ex) {
+      LOGGER.warn("Exception: %s", ex.getMessage());
+      return result;
+    }
 
-        Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
+    Map<String, Object> metadata = 
ElasticsearchMetadataUtil.asMap(metadataObjectNode);
 
-        if(metadata == null)
-            return result;
+    if ( metadata == null ) {
+      return result;
+    }
 
-        entry.setMetadata(metadata);
+    entry.setMetadata(metadata);
 
-        result.add(entry);
+    result.add(entry);
 
-        return result;
-    }
+    return result;
+  }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        mapper.registerModule(new JsonOrgModule());
-    }
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
+    mapper.registerModule(new JsonOrgModule());
+  }
 
-    @Override
-    public void cleanUp() {
-        mapper = null;
-    }
+  @Override
+  public void cleanUp() {
+    mapper = null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
index e9aa900..721ad42 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/MetadataFromDocumentProcessor.java
@@ -18,16 +18,17 @@
 
 package org.apache.streams.elasticsearch.processor;
 
+import org.apache.streams.core.StreamsDatum;
+import org.apache.streams.core.StreamsProcessor;
+import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.fasterxml.jackson.datatype.jsonorg.JsonOrgModule;
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import org.apache.streams.core.StreamsDatum;
-import org.apache.streams.core.StreamsProcessor;
-import org.apache.streams.elasticsearch.ElasticsearchMetadataUtil;
-import org.apache.streams.jackson.StreamsJacksonMapper;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -39,82 +40,94 @@ import java.util.Map;
 /**
  * Examines document to derive metadata fields.
  *
+ * </p>
  * This is useful if you have a document with a populated 'id', and 'verb' or 
'objectType' fields you want
  * to use as _id and _type respectively when indexing.
  */
 public class MetadataFromDocumentProcessor implements StreamsProcessor, 
Serializable {
 
-    public final static String STREAMS_ID = "MetadataFromDocumentProcessor";
+  public static final String STREAMS_ID = "MetadataFromDocumentProcessor";
 
-    private ObjectMapper mapper;
+  private ObjectMapper mapper;
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataFromDocumentProcessor.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(MetadataFromDocumentProcessor.class);
 
-    public MetadataFromDocumentProcessor() {
-    }
+  public MetadataFromDocumentProcessor() {
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+
+    if ( mapper == null ) {
+      mapper = StreamsJacksonMapper.getInstance();
     }
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        if( mapper == null ) mapper = StreamsJacksonMapper.getInstance();
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        Map<String, Object> metadata = entry.getMetadata();
-        if( metadata == null ) metadata = Maps.newHashMap();
-
-        String id = null;
-        String type = null;
-
-        Object document = entry.getDocument();
-        ObjectNode objectNode = null;
-        if( document instanceof String) {
-            try {
-                objectNode = mapper.readValue((String) document, 
ObjectNode.class);
-            } catch (IOException e) {
-                LOGGER.warn("Can't deserialize to determine metadata", e);
-            }
-        } else {
-            try {
-                objectNode = mapper.convertValue(document, ObjectNode.class);
-            } catch (Exception e) {
-                LOGGER.warn("Can't deserialize to determine metadata", e);
-            }
-        }
-        if( objectNode != null ) {
-            if (objectNode.has("id"))
-                id = objectNode.get("id").textValue();
-            if (objectNode.has("verb"))
-                type = objectNode.get("verb").textValue();
-            if (objectNode.has("objectType"))
-                type = objectNode.get("objectType").textValue();
-        }
-
-        if( !Strings.isNullOrEmpty(id) ) metadata.put("id", id);
-        if( !Strings.isNullOrEmpty(type) ) metadata.put("type", type);
-
-        entry.setId(id);
-        entry.setMetadata(metadata);
-
-        result.add(entry);
-
-        return result;
+    List<StreamsDatum> result = Lists.newArrayList();
+
+    Map<String, Object> metadata = entry.getMetadata();
+    if ( metadata == null ) {
+      metadata = Maps.newHashMap();
     }
 
-    @Override
-    public void prepare(Object configurationObject) {
-        mapper = StreamsJacksonMapper.getInstance();
-        mapper.registerModule(new JsonOrgModule());
+    String id = null;
+    String type = null;
+
+    Object document = entry.getDocument();
+    ObjectNode objectNode = null;
+    if ( document instanceof String) {
+      try {
+        objectNode = mapper.readValue((String) document, ObjectNode.class);
+      } catch (IOException ex) {
+        LOGGER.warn("Can't deserialize to determine metadata", ex);
+      }
+    } else {
+      try {
+        objectNode = mapper.convertValue(document, ObjectNode.class);
+      } catch (Exception ex) {
+        LOGGER.warn("Can't deserialize to determine metadata", ex);
+      }
+    }
+    if ( objectNode != null ) {
+      if (objectNode.has("id")) {
+        id = objectNode.get("id").textValue();
+      }
+      if (objectNode.has("verb")) {
+        type = objectNode.get("verb").textValue();
+      }
+      if (objectNode.has("objectType")) {
+        type = objectNode.get("objectType").textValue();
+      }
     }
 
-    @Override
-    public void cleanUp() {
-        mapper = null;
+    if ( !Strings.isNullOrEmpty(id) ) {
+      metadata.put("id", id);
     }
+    if ( !Strings.isNullOrEmpty(type) ) {
+      metadata.put("type", type);
+    }
+
+    entry.setId(id);
+    entry.setMetadata(metadata);
+
+    result.add(entry);
+
+    return result;
+  }
+
+  @Override
+  public void prepare(Object configurationObject) {
+    mapper = StreamsJacksonMapper.getInstance();
+    mapper.registerModule(new JsonOrgModule());
+  }
+
+  @Override
+  public void cleanUp() {
+    mapper = null;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
index f37527a..69394ee 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/main/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessor.java
@@ -18,13 +18,6 @@
 
 package org.apache.streams.elasticsearch.processor;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.JsonNodeFactory;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.core.StreamsProcessor;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
@@ -33,6 +26,15 @@ import 
org.apache.streams.elasticsearch.ElasticsearchWriterConfiguration;
 import org.apache.streams.jackson.StreamsJacksonMapper;
 import org.apache.streams.pojo.extensions.ExtensionUtil;
 import org.apache.streams.pojo.json.Activity;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
 import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
 import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
@@ -51,7 +53,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
 
 /**
  * References:
@@ -65,287 +71,304 @@ import java.util.*;
 
 public class PercolateTagProcessor implements StreamsProcessor {
 
-    public static final String STREAMS_ID = "PercolateTagProcessor";
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(PercolateTagProcessor.class);
-    private final static String DEFAULT_PERCOLATE_FIELD = "_all";
-
-    private ObjectMapper mapper;
-
-    protected Queue<StreamsDatum> inQueue;
-    protected Queue<StreamsDatum> outQueue;
-
-    public String TAGS_EXTENSION = "tags";
-
-    private ElasticsearchWriterConfiguration config;
-    private ElasticsearchClientManager manager;
-    private BulkRequestBuilder bulkBuilder;
-    protected String usePercolateField;
-
-    public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
-        this(config, DEFAULT_PERCOLATE_FIELD);
+  public static final String STREAMS_ID = "PercolateTagProcessor";
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PercolateTagProcessor.class);
+  private static final String DEFAULT_PERCOLATE_FIELD = "_all";
+
+  private ObjectMapper mapper;
+
+  protected Queue<StreamsDatum> inQueue;
+  protected Queue<StreamsDatum> outQueue;
+
+  public static final String TAGS_EXTENSION = "tags";
+
+  private ElasticsearchWriterConfiguration config;
+  private ElasticsearchClientManager manager;
+  private BulkRequestBuilder bulkBuilder;
+  protected String usePercolateField;
+
+  public PercolateTagProcessor(ElasticsearchWriterConfiguration config) {
+    this(config, DEFAULT_PERCOLATE_FIELD);
+  }
+
+  public PercolateTagProcessor(ElasticsearchWriterConfiguration config, String 
defaultPercolateField) {
+    this.config = config;
+    this.usePercolateField = defaultPercolateField;
+  }
+
+  public ElasticsearchClientManager getManager() {
+    return manager;
+  }
+
+  public void setManager(ElasticsearchClientManager manager) {
+    this.manager = manager;
+  }
+
+  public ElasticsearchConfiguration getConfig() {
+    return config;
+  }
+
+  public void setConfig(ElasticsearchWriterConfiguration config) {
+    this.config = config;
+  }
+
+  public Queue<StreamsDatum> getProcessorOutputQueue() {
+    return outQueue;
+  }
+
+  @Override
+  public String getId() {
+    return STREAMS_ID;
+  }
+
+  @Override
+  public List<StreamsDatum> process(StreamsDatum entry) {
+
+    List<StreamsDatum> result = Lists.newArrayList();
+
+    String json;
+    ObjectNode node;
+    // first check for valid json
+    if (entry.getDocument() instanceof String) {
+      json = (String) entry.getDocument();
+      try {
+        node = (ObjectNode) mapper.readTree(json);
+      } catch (IOException ex) {
+        ex.printStackTrace();
+        return null;
+      }
+    } else if (entry.getDocument() instanceof ObjectNode) {
+      node = (ObjectNode) entry.getDocument();
+      try {
+        json = mapper.writeValueAsString(node);
+      } catch (JsonProcessingException ex) {
+        LOGGER.warn("Invalid datum: ", node);
+        return null;
+      }
+    } else {
+      LOGGER.warn("Incompatible document type: ", 
entry.getDocument().getClass());
+      return null;
     }
 
-    public PercolateTagProcessor(ElasticsearchWriterConfiguration config, 
String defaultPercolateField) {
-        this.config = config;
-        this.usePercolateField = defaultPercolateField;
+    StringBuilder percolateRequestJson = new StringBuilder();
+    percolateRequestJson.append("{ \"doc\": ");
+    percolateRequestJson.append(json);
+    //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
+    percolateRequestJson.append("}");
+
+    PercolateRequestBuilder request;
+    PercolateResponse response;
+
+    try {
+      LOGGER.trace("Percolate request json: {}", 
percolateRequestJson.toString());
+      request = 
manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
+      LOGGER.trace("Percolate request: {}", 
mapper.writeValueAsString(request.request()));
+      response = request.execute().actionGet();
+      LOGGER.trace("Percolate response: {} matches", 
response.getMatches().length);
+    } catch (Exception ex) {
+      LOGGER.warn("Percolate exception: {}", ex.getMessage());
+      return null;
     }
 
-    public ElasticsearchClientManager getManager() {
-        return manager;
-    }
+    ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
 
-    public void setManager(ElasticsearchClientManager manager) {
-        this.manager = manager;
+    Iterator<PercolateResponse.Match> matchIterator = response.iterator();
+    while (matchIterator.hasNext()) {
+      tagArray.add(matchIterator.next().getId().string());
     }
 
-    public ElasticsearchConfiguration getConfig() {
-        return config;
-    }
+    LOGGER.trace("Percolate matches: {}", tagArray);
 
-    public void setConfig(ElasticsearchWriterConfiguration config) {
-        this.config = config;
-    }
+    Activity activity = mapper.convertValue(node, Activity.class);
 
-    public Queue<StreamsDatum> getProcessorOutputQueue() {
-        return outQueue;
-    }
+    appendMatches(tagArray, activity);
 
-    @Override
-    public String getId() {
-        return STREAMS_ID;
-    }
+    entry.setDocument(activity);
 
-    @Override
-    public List<StreamsDatum> process(StreamsDatum entry) {
-
-        List<StreamsDatum> result = Lists.newArrayList();
-
-        String json;
-        ObjectNode node;
-        // first check for valid json
-        if (entry.getDocument() instanceof String) {
-            json = (String) entry.getDocument();
-            try {
-                node = (ObjectNode) mapper.readTree(json);
-            } catch (IOException e) {
-                e.printStackTrace();
-                return null;
-            }
-        } else if (entry.getDocument() instanceof ObjectNode) {
-            node = (ObjectNode) entry.getDocument();
-            try {
-                json = mapper.writeValueAsString(node);
-            } catch (JsonProcessingException e) {
-                LOGGER.warn("Invalid datum: ", node);
-                return null;
-            }
-        } else {
-            LOGGER.warn("Incompatible document type: ", 
entry.getDocument().getClass());
-            return null;
-        }
-
-        StringBuilder percolateRequestJson = new StringBuilder();
-        percolateRequestJson.append("{ \"doc\": ");
-        percolateRequestJson.append(json);
-        //percolateRequestJson.append("{ \"content\" : \"crazy good shit\" }");
-        percolateRequestJson.append("}");
-
-        PercolateRequestBuilder request;
-        PercolateResponse response;
-
-        try {
-            LOGGER.trace("Percolate request json: {}", 
percolateRequestJson.toString());
-            request = 
manager.getClient().preparePercolate().setIndices(config.getIndex()).setDocumentType(config.getType()).setSource(percolateRequestJson.toString());
-            LOGGER.trace("Percolate request: {}", 
mapper.writeValueAsString(request.request()));
-            response = request.execute().actionGet();
-            LOGGER.trace("Percolate response: {} matches", 
response.getMatches().length);
-        } catch (Exception e) {
-            LOGGER.warn("Percolate exception: {}", e.getMessage());
-            return null;
-        }
-
-        ArrayNode tagArray = JsonNodeFactory.instance.arrayNode();
-
-        Iterator<PercolateResponse.Match> matchIterator = response.iterator();
-        while(matchIterator.hasNext()) {
-            tagArray.add(matchIterator.next().getId().string());
-        }
-
-        LOGGER.trace("Percolate matches: {}", tagArray);
-
-        Activity activity = mapper.convertValue(node, Activity.class);
-
-        appendMatches(tagArray, activity);
-
-        entry.setDocument(activity);
-
-        result.add(entry);
-
-        return result;
+    result.add(entry);
 
-    }
+    return result;
 
-    protected void appendMatches(ArrayNode tagArray, Activity activity) {
+  }
 
-        ExtensionUtil.getInstance().addExtension(activity, TAGS_EXTENSION, 
tagArray);
+  protected void appendMatches(ArrayNode tagArray, Activity activity) {
 
-    }
+    ExtensionUtil.getInstance().addExtension(activity, TAGS_EXTENSION, 
tagArray);
 
-    @Override
-    public void prepare(Object o) {
+  }
 
-        mapper = StreamsJacksonMapper.getInstance();
+  @Override
+  public void prepare(Object configuration) {
 
-        Preconditions.checkNotNull(config);
+    mapper = StreamsJacksonMapper.getInstance();
 
-        manager = new ElasticsearchClientManager(config);
+    Preconditions.checkNotNull(config);
 
-        if( config.getTags() != null && 
config.getTags().getAdditionalProperties().size() > 0) {
-            // initial write tags to index
-            createIndexIfMissing(config.getIndex());
-            if( config.getReplaceTags() == true ) {
-                deleteOldQueries(config.getIndex());
-            }
-            for (String tag : 
config.getTags().getAdditionalProperties().keySet()) {
-                String query = (String) 
config.getTags().getAdditionalProperties().get(tag);
-                PercolateQueryBuilder queryBuilder = new 
PercolateQueryBuilder(tag, query, this.usePercolateField);
-                addPercolateRule(queryBuilder, config.getIndex());
-            }
-            bulkBuilder = manager.getClient().prepareBulk();
+    manager = new ElasticsearchClientManager(config);
 
-            if (writePercolateRules() == true)
-                LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags 
to " + config.getIndex() + " _percolator");
-            else
-                LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() 
+ " tags to " + config.getIndex() + " _percolator");
-        }
+    if ( config.getTags() != null && 
config.getTags().getAdditionalProperties().size() > 0) {
+      // initial write tags to index
+      createIndexIfMissing(config.getIndex());
+      if ( config.getReplaceTags() == true ) {
+        deleteOldQueries(config.getIndex());
+      }
+      for (String tag : config.getTags().getAdditionalProperties().keySet()) {
+        String query = (String) 
config.getTags().getAdditionalProperties().get(tag);
+        PercolateQueryBuilder queryBuilder = new PercolateQueryBuilder(tag, 
query, this.usePercolateField);
+        addPercolateRule(queryBuilder, config.getIndex());
+      }
+      bulkBuilder = manager.getClient().prepareBulk();
 
+      if (writePercolateRules() == true) {
+        LOGGER.info("wrote " + bulkBuilder.numberOfActions() + " tags to " + 
config.getIndex() + " _percolator");
+      } else {
+        LOGGER.error("FAILED writing " + bulkBuilder.numberOfActions() + " 
tags to " + config.getIndex() + " _percolator");
+      }
     }
 
-    @Override
-    public void cleanUp() {
-        if( config.getCleanupTags() == true )
-            deleteOldQueries(config.getIndex());
-        manager.getClient().close();
-    }
+  }
 
-    public int numOfPercolateRules() {
-        return this.bulkBuilder.numberOfActions();
+  @Override
+  public void cleanUp() {
+    if ( config.getCleanupTags() == true ) {
+      deleteOldQueries(config.getIndex());
     }
-
-    public void createIndexIfMissing(String indexName) {
-        if (!this.manager.getClient()
-                .admin()
-                .indices()
-                .exists(new IndicesExistsRequest(indexName))
-                .actionGet()
-                .isExists()) {
-            // It does not exist... So we are going to need to create the 
index.
-            // we are going to assume that the 'templates' that we have loaded 
into
-            // elasticsearch are sufficient to ensure the index is being 
created properly.
-            CreateIndexResponse response = 
this.manager.getClient().admin().indices().create(new 
CreateIndexRequest(indexName)).actionGet();
-
-            if (response.isAcknowledged()) {
-                LOGGER.info("Index {} did not exist. The index was 
automatically created from the stored ElasticSearch Templates.", indexName);
-            } else {
-                LOGGER.error("Index {} did not exist. While attempting to 
create the index from stored ElasticSearch Templates we were unable to get an 
acknowledgement.", indexName);
-                LOGGER.error("Error Message: {}", response.toString());
-                throw new RuntimeException("Unable to create index " + 
indexName);
-            }
-        }
+    manager.getClient().close();
+  }
+
+  public int numOfPercolateRules() {
+    return this.bulkBuilder.numberOfActions();
+  }
+
+  /**
+   * createIndexIfMissing.
+   * @param indexName indexName
+   */
+  public void createIndexIfMissing(String indexName) {
+    if (!this.manager.getClient()
+        .admin()
+        .indices()
+        .exists(new IndicesExistsRequest(indexName))
+        .actionGet()
+        .isExists()) {
+      // It does not exist... So we are going to need to create the index.
+      // we are going to assume that the 'templates' that we have loaded into
+      // elasticsearch are sufficient to ensure the index is being created 
properly.
+      CreateIndexResponse response = 
this.manager.getClient().admin().indices().create(new 
CreateIndexRequest(indexName)).actionGet();
+
+      if (response.isAcknowledged()) {
+        LOGGER.info("Index {} did not exist. The index was automatically 
created from the stored ElasticSearch Templates.", indexName);
+      } else {
+        LOGGER.error("Index {} did not exist. While attempting to create the 
index from stored ElasticSearch Templates we were unable to get an 
acknowledgement.", indexName);
+        LOGGER.error("Error Message: {}", response.toString());
+        throw new RuntimeException("Unable to create index " + indexName);
+      }
     }
-
-    public void addPercolateRule(PercolateQueryBuilder builder, String index) {
-        this.bulkBuilder.add(manager.getClient().prepareIndex(index, 
".percolator", builder.getId())
-                .setSource(builder.getSource()));
+  }
+
+  public void addPercolateRule(PercolateQueryBuilder builder, String index) {
+    this.bulkBuilder.add(manager.getClient().prepareIndex(index, 
".percolator", builder.getId())
+        .setSource(builder.getSource()));
+  }
+
+  /**
+   *
+   * @return returns true if all rules were addded. False indicates one or 
more rules have failed.
+   */
+  public boolean writePercolateRules() {
+    if (this.numOfPercolateRules() < 0) {
+      throw new RuntimeException("No Rules Have been added!");
     }
-
-    /**
-     *
-     * @return returns true if all rules were addded. False indicates one or 
more rules have failed.
-     */
-    public boolean writePercolateRules() {
-        if(this.numOfPercolateRules() < 0) {
-            throw new RuntimeException("No Rules Have been added!");
-        }
-        BulkResponse response = this.bulkBuilder.execute().actionGet();
-        for(BulkItemResponse r : response.getItems()) {
-            if(r.isFailed()) {
-                LOGGER.error(r.getId()+"\t"+r.getFailureMessage());
-            }
-        }
-        return !response.hasFailures();
+    BulkResponse response = this.bulkBuilder.execute().actionGet();
+    for (BulkItemResponse r : response.getItems()) {
+      if (r.isFailed()) {
+        LOGGER.error(r.getId() + "\t" + r.getFailureMessage());
+      }
     }
-
-    /**
-     *
-     * @param ids
-     * @param index
-     * @return  Returns true if all of the old tags were removed. False 
indicates one or more tags were not removed.
-     */
-    public boolean removeOldTags(Set<String> ids, String index) {
-        if(ids.size() == 0) {
-            return false;
-        }
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String id : ids) {
-            bulk.add(manager.getClient().prepareDelete("_percolator", index, 
id));
-        }
-        return !bulk.execute().actionGet().hasFailures();
+    return !response.hasFailures();
+  }
+
+  /**
+   * Attempt to removeOldTags.
+   * @param ids ids
+   * @param index index
+   * @return Returns true if all of the old tags were removed. False indicates 
one or more tags were not removed.
+   */
+  public boolean removeOldTags(Set<String> ids, String index) {
+    if (ids.size() == 0) {
+      return false;
     }
-
-    public Set<String> getActivePercolateTags(String index) {
-        Set<String> tags = new HashSet<String>();
-        SearchRequestBuilder searchBuilder = 
manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
-        SearchResponse response = 
searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
-        SearchHits hits = response.getHits();
-        for(SearchHit hit : hits.getHits()) {
-            tags.add(hit.id());
-        }
-        return tags;
+    BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+    for (String id : ids) {
+      bulk.add(manager.getClient().prepareDelete("_percolator", index, id));
     }
-
-    /**
-     *
-     * @param index
-     * @return
-     */
-    public boolean deleteOldQueries(String index) {
-        Set<String> tags = getActivePercolateTags(index);
-        if(tags.size() == 0) {
-            LOGGER.warn("No active tags were found in _percolator for index : 
{}", index);
-            return false;
-        }
-        LOGGER.info("Deleting {} tags.", tags.size());
-        BulkRequestBuilder bulk = manager.getClient().prepareBulk();
-        for(String tag : tags) {
-            
bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
-        }
-        BulkResponse response =bulk.execute().actionGet();
-        return !response.hasFailures();
+    return !bulk.execute().actionGet().hasFailures();
+  }
+
+  /**
+   * get active percolate tags.
+   * @param index index
+   * @return result
+   */
+  public Set<String> getActivePercolateTags(String index) {
+    Set<String> tags = new HashSet<String>();
+    SearchRequestBuilder searchBuilder = 
manager.getClient().prepareSearch("*").setIndices(index).setTypes(".percolator").setSize(1000);
+    SearchResponse response = 
searchBuilder.setQuery(QueryBuilders.matchAllQuery()).execute().actionGet();
+    SearchHits hits = response.getHits();
+    for (SearchHit hit : hits.getHits()) {
+      tags.add(hit.id());
     }
+    return tags;
+  }
+
+  /**
+   * delete old queries.
+   * @param index index
+   * @return result
+   */
+  public boolean deleteOldQueries(String index) {
+    Set<String> tags = getActivePercolateTags(index);
+    if (tags.size() == 0) {
+      LOGGER.warn("No active tags were found in _percolator for index : {}", 
index);
+      return false;
+    }
+    LOGGER.info("Deleting {} tags.", tags.size());
+    BulkRequestBuilder bulk = manager.getClient().prepareBulk();
+    for (String tag : tags) {
+      
bulk.add(manager.getClient().prepareDelete().setType(".percolator").setIndex(index).setId(tag));
+    }
+    BulkResponse response = bulk.execute().actionGet();
+    return !response.hasFailures();
+  }
 
-    public static class PercolateQueryBuilder {
-
-        private QueryStringQueryBuilder queryBuilder;
-        private String id;
-
-        public PercolateQueryBuilder(String id, String query, String 
defaultPercolateField) {
-            this.id = id;
-            this.queryBuilder = new QueryStringQueryBuilder(query);
-            this.queryBuilder.defaultField(defaultPercolateField);
-        }
+  public static class PercolateQueryBuilder {
 
-        public String getId() {
-            return this.id;
-        }
+    private QueryStringQueryBuilder queryBuilder;
+    private String id;
 
-        public String getSource() {
-            return "{ \n\"query\" : "+this.queryBuilder.toString()+"\n}";
-        }
+    /**
+     * PercolateQueryBuilder constructor.
+     * @param id
+     * @param query
+     * @param defaultPercolateField
+     */
+    public PercolateQueryBuilder(String id, String query, String 
defaultPercolateField) {
+      this.id = id;
+      this.queryBuilder = new QueryStringQueryBuilder(query);
+      this.queryBuilder.defaultField(defaultPercolateField);
+    }
 
+    public String getId() {
+      return this.id;
     }
 
-    public enum FilterLevel {
-        MUST, SHOULD, MUST_NOT
+    public String getSource() {
+      return "{ \n\"query\" : " + this.queryBuilder.toString() + "\n}";
     }
+
+  }
+
+  public enum FilterLevel {
+    MUST, SHOULD, MUST_NOT
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java
index f0d9c90..e252901 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/processor/PercolateTagProcessorTest.java
@@ -22,25 +22,30 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Unit Test for
+ * @see org.apache.streams.elasticsearch.processor.PercolateTagProcessor
+ */
 public class PercolateTagProcessorTest {
-    private final String id = "test_id";
-    private final String query = "test_query";
-    private final String defaultPercolateField = "activity.content";
-
-    private final String expectedResults = "{ \n" +
-            "\"query\" : {\n" +
-            "  \"query_string\" : {\n" +
-            "    \"query\" : \"test_query\",\n" +
-            "    \"default_field\" : \"activity.content\"\n" +
-            "  }\n" +
-            "}\n" +
-            "}";
-
-    @Test
-    public void percolateTagProcessorQueryBuilderTest() {
-        PercolateTagProcessor.PercolateQueryBuilder percolateQueryBuilder = 
new PercolateTagProcessor.PercolateQueryBuilder(id, query, 
defaultPercolateField);
-
-        assertEquals(id, percolateQueryBuilder.getId());
+
+  private final String id = "test_id";
+  private final String query = "test_query";
+  private final String defaultPercolateField = "activity.content";
+
+  private final String expectedResults = "{ \n" +
+      "\"query\" : {\n" +
+      "  \"query_string\" : {\n" +
+      "    \"query\" : \"test_query\",\n" +
+      "    \"default_field\" : \"activity.content\"\n" +
+      "  }\n" +
+      "}\n" +
+      "}";
+
+  @Test
+  public void percolateTagProcessorQueryBuilderTest() {
+    PercolateTagProcessor.PercolateQueryBuilder percolateQueryBuilder = new 
PercolateTagProcessor.PercolateQueryBuilder(id, query, defaultPercolateField);
+
+    assertEquals(id, percolateQueryBuilder.getId());
 //        assertEquals(expectedResults, percolateQueryBuilder.getSource());
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
index c81d183..caa0b8d 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/DatumFromMetadataProcessorIT.java
@@ -18,16 +18,18 @@
 
 package org.apache.streams.elasticsearch.test;
 
-import com.google.common.collect.Maps;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigParseOptions;
-import org.apache.commons.lang.SerializationUtils;
 import org.apache.streams.config.ComponentConfigurator;
 import org.apache.streams.core.StreamsDatum;
 import org.apache.streams.elasticsearch.ElasticsearchClientManager;
 import org.apache.streams.elasticsearch.ElasticsearchReaderConfiguration;
 import org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor;
+
+import com.google.common.collect.Maps;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigParseOptions;
+
+import org.apache.commons.lang.SerializationUtils;
 import org.elasticsearch.client.Client;
 import org.junit.Assert;
 import org.junit.Before;
@@ -37,57 +39,58 @@ import java.io.File;
 import java.util.Map;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Integration Test for
+ * @see org.apache.streams.elasticsearch.processor.DatumFromMetadataProcessor
  */
 public class DatumFromMetadataProcessorIT {
 
-    private ElasticsearchReaderConfiguration testConfiguration;
-    protected Client testClient;
+  private ElasticsearchReaderConfiguration testConfiguration;
+  protected Client testClient;
 
-    @Test
-    public void testSerializability() {
-        DatumFromMetadataProcessor processor = new 
DatumFromMetadataProcessor(testConfiguration);
+  @Test
+  public void testSerializability() {
+    DatumFromMetadataProcessor processor = new 
DatumFromMetadataProcessor(testConfiguration);
 
-        DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) 
SerializationUtils.clone(processor);
-    }
+    DatumFromMetadataProcessor clone = (DatumFromMetadataProcessor) 
SerializationUtils.clone(processor);
+  }
 
-    @Before
-    public void prepareTest() throws Exception {
+  @Before
+  public void prepareTest() throws Exception {
 
-        Config reference  = ConfigFactory.load();
-        File conf_file = new 
File("target/test-classes/DatumFromMetadataProcessorIT.conf");
-        assert(conf_file.exists());
-        Config testResourceConfig  = 
ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
-        Config typesafe  = 
testResourceConfig.withFallback(reference).resolve();
-        testConfiguration = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
-        testClient = new 
ElasticsearchClientManager(testConfiguration).getClient();
+    Config reference  = ConfigFactory.load();
+    File conf_file = new 
File("target/test-classes/DatumFromMetadataProcessorIT.conf");
+    assert(conf_file.exists());
+    Config testResourceConfig  = ConfigFactory.parseFileAnySyntax(conf_file, 
ConfigParseOptions.defaults().setAllowMissing(false));
+    Config typesafe  = testResourceConfig.withFallback(reference).resolve();
+    testConfiguration = new 
ComponentConfigurator<>(ElasticsearchReaderConfiguration.class).detectConfiguration(typesafe,
 "elasticsearch");
+    testClient = new ElasticsearchClientManager(testConfiguration).getClient();
 
-    }
+  }
 
-    @Test
-    public void testDatumFromMetadataProcessor() {
+  @Test
+  public void testDatumFromMetadataProcessor() {
 
-        Map<String, Object> metadata = Maps.newHashMap();
+    Map<String, Object> metadata = Maps.newHashMap();
 
-        metadata.put("index", testConfiguration.getIndexes().get(0));
-        metadata.put("type", testConfiguration.getTypes().get(0));
-        metadata.put("id", "post");
+    metadata.put("index", testConfiguration.getIndexes().get(0));
+    metadata.put("type", testConfiguration.getTypes().get(0));
+    metadata.put("id", "post");
 
-        DatumFromMetadataProcessor processor = new 
DatumFromMetadataProcessor(testConfiguration);
+    DatumFromMetadataProcessor processor = new 
DatumFromMetadataProcessor(testConfiguration);
 
-        StreamsDatum testInput = new StreamsDatum(null);
+    StreamsDatum testInput = new StreamsDatum(null);
 
-        testInput.setMetadata(metadata);
+    testInput.setMetadata(metadata);
 
-        Assert.assertNull(testInput.document);
+    Assert.assertNull(testInput.document);
 
-        processor.prepare(null);
+    processor.prepare(null);
 
-        StreamsDatum testOutput = processor.process(testInput).get(0);
+    StreamsDatum testOutput = processor.process(testInput).get(0);
 
-        processor.cleanUp();
+    processor.cleanUp();
 
-        Assert.assertNotNull(testOutput.document);
+    Assert.assertNotNull(testOutput.document);
 
-    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
index 7c655db..b0e67a9 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchITs.java
@@ -15,6 +15,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.streams.elasticsearch.test;
 
 import org.junit.runner.RunWith;
@@ -28,7 +29,10 @@ import org.junit.runners.Suite;
         ElasticsearchParentChildUpdaterIT.class,
         DatumFromMetadataProcessorIT.class
 })
-
+/**
+ * Integration Test Suite for
+ * @see org.apache.streams.elasticsearch
+ */
 public class ElasticsearchITs {
     // the class remains empty,
     // used only as a holder for the above annotations

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
index 6344028..553a711 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildUpdaterIT.java
@@ -74,11 +74,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Integration Test for
+ * @see org.apache.streams.elasticsearch.ElasticsearchPersistUpdater
+ * using parent/child associated documents.
  */
 public class ElasticsearchParentChildUpdaterIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildUpdaterIT.class);
 
     private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 

http://git-wip-us.apache.org/repos/asf/incubator-streams/blob/5dffd5c3/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
----------------------------------------------------------------------
diff --git 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
index 637fdfc..6b52ce5 100644
--- 
a/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
+++ 
b/streams-contrib/streams-persist-elasticsearch/src/test/java/org/apache/streams/elasticsearch/test/ElasticsearchParentChildWriterIT.java
@@ -66,11 +66,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 
 /**
- * Created by sblackmon on 10/20/14.
+ * Integration Test for
+ * @see org.apache.streams.elasticsearch.ElasticsearchPersistWriter
+ * using parent/child associated documents.
  */
 public class ElasticsearchParentChildWriterIT {
 
-    private final static Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(ElasticsearchParentChildWriterIT.class);
 
     private static ObjectMapper MAPPER = StreamsJacksonMapper.getInstance();
 

Reply via email to