METRON-1419: Create a SolrDao this closes apache/incubator-metron#911

Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/23113a63
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/23113a63
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/23113a63

Branch: refs/heads/feature/METRON-1416-upgrade-solr
Commit: 23113a6337a3fc4d0bfbb708303b30bf8122f23f
Parents: 644e951
Author: merrimanr <merrim...@gmail.com>
Authored: Thu Feb 1 16:13:46 2018 -0500
Committer: cstella <ceste...@gmail.com>
Committed: Thu Feb 1 16:13:46 2018 -0500

----------------------------------------------------------------------
 dependencies_with_url.csv                       |    3 +
 .../elasticsearch/dao/ColumnMetadataDao.java    |   67 -
 .../dao/ElasticsearchColumnMetadataDao.java     |   31 +-
 .../elasticsearch/dao/ElasticsearchDao.java     |  651 +------
 .../dao/ElasticsearchSearchDao.java             |  565 ++++++
 .../dao/ElasticsearchUpdateDao.java             |  130 ++
 .../elasticsearch/dao/ElasticsearchDaoTest.java |    8 +-
 .../ElasticsearchSearchIntegrationTest.java     |  112 ++
 .../ElasticsearchUpdateIntegrationTest.java     |  219 +--
 .../metron/indexing/dao/ColumnMetadataDao.java  |   39 +
 .../metron/indexing/dao/search/SearchDao.java   |   34 +
 .../metron/indexing/dao/update/UpdateDao.java   |   30 +
 .../indexing/dao/SearchIntegrationTest.java     |  215 +--
 .../indexing/dao/UpdateIntegrationTest.java     |  199 +++
 metron-platform/metron-solr/pom.xml             |   37 +-
 .../metron/solr/dao/SolrColumnMetadataDao.java  |  120 ++
 .../org/apache/metron/solr/dao/SolrDao.java     |  117 ++
 .../apache/metron/solr/dao/SolrSearchDao.java   |  310 ++++
 .../apache/metron/solr/dao/SolrUpdateDao.java   |  100 ++
 .../SolrIndexingIntegrationTest.java            |   10 +-
 .../integration/SolrSearchIntegrationTest.java  |  153 ++
 .../integration/SolrUpdateIntegrationTest.java  |   73 +
 .../integration/components/SolrComponent.java   |   22 +-
 .../resources/config/bro/conf/managed-schema    |   50 +
 .../resources/config/bro/conf/solrconfig.xml    | 1601 ++++++++++++++++++
 .../resources/config/snort/conf/managed-schema  |   51 +
 .../resources/config/snort/conf/solrconfig.xml  | 1601 ++++++++++++++++++
 .../resources/config/test/conf/managed-schema   |   68 +
 .../resources/config/test/conf/solrconfig.xml   | 1601 ++++++++++++++++++
 pom.xml                                         |    2 +-
 30 files changed, 7251 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/dependencies_with_url.csv
----------------------------------------------------------------------
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index a1f431b..2bf1c76 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -21,6 +21,7 @@ com.esotericsoftware:reflectasm:jar:1.10.1:compile,New BSD 
License,http://code.g
 com.flipkart.zjsonpatch:zjsonpatch:jar:0.3.4:compile,Apache v2, 
https://github.com/flipkart-incubator/zjsonpatch
 com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD 
license,http://code.google.com/p/protobuf
 com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD 
license,http://code.google.com/p/protobuf
+com.google.protobuf:protobuf-java:jar:3.1.0:compile,New BSD 
license,http://code.google.com/p/protobuf
 com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/
 com.maxmind.db:maxmind-db:jar:1.2.1:compile,CC-BY-SA 
3.0,https://github.com/maxmind/MaxMind-DB
 com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache 
v2,https://github.com/maxmind/GeoIP2-java
@@ -78,6 +79,7 @@ 
org.jvnet.jaxb2_commons:jaxb2-basics-runtime:jar:0.6.5:compile,BSD,https://githu
 org.krakenapps:kraken-api:jar:2.1.1:compile, Apache v2,
 org.krakenapps:kraken-pcap:jar:1.7.1:compile, Apache v2,
 org.ow2.asm:asm:jar:4.0:compile,BSD,http://asm.ow2.org/
+org.ow2.asm:asm:jar:5.1:compile,BSD,http://asm.ow2.org/
 org.slf4j:slf4j-api:jar:1.6.1:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-api:jar:1.7.10:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-api:jar:1.7.5:compile,MIT,http://www.slf4j.org
@@ -91,6 +93,7 @@ 
org.slf4j:slf4j-log4j12:jar:1.7.21:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org
+org.slf4j:jcl-over-slf4j:jar:1.7.7:compile,MIT,http://www.slf4j.org
 org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
 org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org
 aopalliance:aopalliance:jar:1.0:compile,Public 
Domain,http://aopalliance.sourceforge.net

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
deleted file mode 100644
index 0393629..0000000
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.elasticsearch.dao;
-
-import org.apache.metron.indexing.dao.search.FieldType;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Responsible for retrieving column-level metadata about search indices.
- */
-public interface ColumnMetadataDao {
-
-  /**
-   * Retrieves column metadata for one or more search indices.
-   * @param indices The search indices to retrieve column metadata for.
-   * @return The column metadata, one set for each search index.
-   * @throws IOException
-   */
-  Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException;
-
-  /**
-   * Finds the latest version of a set of base indices.  This can be used to 
find
-   * the latest 'bro' index, for example.
-   *
-   * Assuming the following indices exist...
-   *
-   *    [
-   *      'bro_index_2017.10.03.19'
-   *      'bro_index_2017.10.03.20',
-   *      'bro_index_2017.10.03.21',
-   *      'snort_index_2017.10.03.19',
-   *      'snort_index_2017.10.03.20',
-   *      'snort_index_2017.10.03.21'
-   *    ]
-   *
-   *  And the include indices are given as...
-   *
-   *    ['bro', 'snort']
-   *
-   * Then the latest indices are...
-   *
-   *    ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21']
-   *
-   * @param includeIndices The base names of the indices to include
-   * @return The latest version of a set of indices.
-   */
-  String[] getLatestIndices(List<String> includeIndices);
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index c12802e..6a8cad8 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -18,6 +18,7 @@
 
 package org.apache.metron.elasticsearch.dao;
 
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.client.AdminClient;
@@ -140,12 +141,32 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
   }
 
   /**
-   * Retrieves the latest indices.
-   * @param includeIndices
-   * @return
+   * Finds the latest version of a set of base indices.  This can be used to 
find
+   * the latest 'bro' index, for example.
+   *
+   * Assuming the following indices exist...
+   *
+   *    [
+   *      'bro_index_2017.10.03.19'
+   *      'bro_index_2017.10.03.20',
+   *      'bro_index_2017.10.03.21',
+   *      'snort_index_2017.10.03.19',
+   *      'snort_index_2017.10.03.20',
+   *      'snort_index_2017.10.03.21'
+   *    ]
+   *
+   *  And the include indices are given as...
+   *
+   *    ['bro', 'snort']
+   *
+   * Then the latest indices are...
+   *
+   *    ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21']
+   *
+   * @param includeIndices The base names of the indices to include
+   * @return The latest version of a set of indices.
    */
-  @Override
-  public String[] getLatestIndices(List<String> includeIndices) {
+  String[] getLatestIndices(List<String> includeIndices) {
     LOG.debug("Getting latest indices; indices={}", includeIndices);
     Map<String, String> latestIndices = new HashMap<>();
     String[] indices = adminClient

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 650462e..7d6a9e5 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,96 +17,39 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
-import org.apache.metron.indexing.dao.search.Group;
-import org.apache.metron.indexing.dao.search.GroupOrder;
-import org.apache.metron.indexing.dao.search.GroupOrderType;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
-import org.apache.metron.indexing.dao.search.GroupResult;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
-import org.apache.metron.indexing.dao.search.SearchResult;
-import org.apache.metron.indexing.dao.search.SortField;
-import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchRequestBuilder;
-import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
-import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.SearchHits;
-import org.elasticsearch.search.aggregations.Aggregation;
-import org.elasticsearch.search.aggregations.AggregationBuilders;
-import org.elasticsearch.search.aggregations.Aggregations;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
-import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
-import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-
-import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-
 public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-  /**
-   * The value required to ensure that Elasticsearch sorts missing values last.
-   */
-  private static final String SORT_MISSING_LAST = "_last";
-
-  /**
-   * The value required to ensure that Elasticsearch sorts missing values last.
-   */
-  private static final String SORT_MISSING_FIRST = "_first";
-
-  /**
-   * The Elasticsearch client.
-   */
   private transient TransportClient client;
+  private ElasticsearchSearchDao searchDao;
+  private ElasticsearchUpdateDao updateDao;
 
   /**
    * Retrieves column metadata about search indices.
    */
-  private ColumnMetadataDao columnMetadataDao;
+  private ElasticsearchColumnMetadataDao columnMetadataDao;
 
   /**
    * Handles the submission of search requests to Elasticsearch.
@@ -116,10 +59,15 @@ public class ElasticsearchDao implements IndexDao {
   private AccessConfig accessConfig;
 
   protected ElasticsearchDao(TransportClient client,
-                             ColumnMetadataDao columnMetadataDao,
-                             ElasticsearchRequestSubmitter requestSubmitter,
-                             AccessConfig config) {
+      AccessConfig config,
+      ElasticsearchSearchDao searchDao,
+      ElasticsearchUpdateDao updateDao,
+      ElasticsearchColumnMetadataDao columnMetadataDao,
+      ElasticsearchRequestSubmitter requestSubmitter
+                             ) {
     this.client = client;
+    this.searchDao = searchDao;
+    this.updateDao = updateDao;
     this.columnMetadataDao = columnMetadataDao;
     this.requestSubmitter = requestSubmitter;
     this.accessConfig = config;
@@ -129,266 +77,14 @@ public class ElasticsearchDao implements IndexDao {
     //uninitialized.
   }
 
-  private static Map<String, FieldType> elasticsearchSearchTypeMap;
-
-  static {
-    Map<String, FieldType> fieldTypeMap = new HashMap<>();
-    fieldTypeMap.put("text", FieldType.TEXT);
-    fieldTypeMap.put("keyword", FieldType.KEYWORD);
-    fieldTypeMap.put("ip", FieldType.IP);
-    fieldTypeMap.put("integer", FieldType.INTEGER);
-    fieldTypeMap.put("long", FieldType.LONG);
-    fieldTypeMap.put("date", FieldType.DATE);
-    fieldTypeMap.put("float", FieldType.FLOAT);
-    fieldTypeMap.put("double", FieldType.DOUBLE);
-    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
-    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
-  }
-
-  @Override
-  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
-    if(searchRequest.getQuery() == null) {
-      throw new InvalidSearchException("Search query is invalid: null");
-    }
-    return search(searchRequest, new 
QueryStringQueryBuilder(searchRequest.getQuery()));
-  }
-
-  /**
-   * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} 
for the query.
-   * @param request The request defining the parameters of the search
-   * @param queryBuilder The actual query to be run. Intended for if the 
SearchRequest requires wrapping
-   * @return The results of the query
-   * @throws InvalidSearchException When the query is malformed or the current 
state doesn't allow search
-   */
-  protected SearchResponse search(SearchRequest request, QueryBuilder 
queryBuilder) throws InvalidSearchException {
-    org.elasticsearch.action.search.SearchRequest esRequest;
-    org.elasticsearch.action.search.SearchResponse esResponse;
-
-    if(client == null) {
-      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
-    }
-
-    if (request.getSize() > accessConfig.getMaxSearchResults()) {
-      throw new InvalidSearchException("Search result size must be less than " 
+ accessConfig.getMaxSearchResults());
-    }
-
-    esRequest = buildSearchRequest(request, queryBuilder);
-    esResponse = requestSubmitter.submitSearch(esRequest);
-    return buildSearchResponse(request, esResponse);
-  }
-
-  /**
-   * Builds an Elasticsearch search request.
-   * @param searchRequest The Metron search request.
-   * @param queryBuilder
-   * @return An Elasticsearch search request.
-   */
-  private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
-          SearchRequest searchRequest,
-          QueryBuilder queryBuilder) throws InvalidSearchException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Got search request; request={}", 
ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
-    }
-    SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
-            .size(searchRequest.getSize())
-            .from(searchRequest.getFrom())
-            .query(queryBuilder)
-            .trackScores(true);
-    Optional<List<String>> fields = searchRequest.getFields();
-    // column metadata needed to understand the type of each sort field
-    Map<String, FieldType> meta;
-    try {
-      meta = getColumnMetadata(searchRequest.getIndices());
-    } catch(IOException e) {
-      throw new InvalidSearchException("Unable to get column metadata", e);
-    }
-
-    // handle sort fields
-    for(SortField sortField : searchRequest.getSort()) {
-
-      // what type is the sort field?
-      FieldType sortFieldType = meta.getOrDefault(sortField.getField(), 
FieldType.OTHER);
-
-      // sort order - if ascending missing values sorted last. otherwise, 
missing values sorted first
-      org.elasticsearch.search.sort.SortOrder sortOrder = 
getElasticsearchSortOrder(sortField.getSortOrder());
-      String missingSortOrder;
-      if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) {
-        missingSortOrder = SORT_MISSING_LAST;
-      } else {
-        missingSortOrder = SORT_MISSING_FIRST;
-      }
-
-      // sort by the field - missing fields always last
-      FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField())
-              .order(sortOrder)
-              .missing(missingSortOrder)
-              .unmappedType(sortFieldType.getFieldType());
-      searchBuilder.sort(sortBy);
-    }
-
-    // handle search fields
-    if (fields.isPresent()) {
-      searchBuilder.fetchSource("*", null);
-    } else {
-      searchBuilder.fetchSource(true);
-    }
-
-    Optional<List<String>> facetFields = searchRequest.getFacetFields();
-
-    // handle facet fields
-    if (searchRequest.getFacetFields().isPresent()) {
-      // 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html
-      for(String field : searchRequest.getFacetFields().get()) {
-        String name = getFacetAggregationName(field);
-        TermsAggregationBuilder terms = AggregationBuilders.terms( 
name).field(field);
-               // new TermsBuilder(name).field(field);
-        searchBuilder.aggregation(terms);
-      }
-    }
-
-    // return the search request
-    String[] indices = wildcardIndices(searchRequest.getIndices());
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Built Elasticsearch request; indices={}, request={}", 
indices, searchBuilder.toString());
-    }
-    return new org.elasticsearch.action.search.SearchRequest()
-            .indices(indices)
-            .source(searchBuilder);
-  }
-
-  /**
-   * Builds a search response.
-   *
-   * This effectively transforms an Elasticsearch search response into a 
Metron search response.
-   *
-   * @param searchRequest The Metron search request.
-   * @param esResponse The Elasticsearch search response.
-   * @return A Metron search response.
-   * @throws InvalidSearchException
-   */
-  private SearchResponse buildSearchResponse(
-          SearchRequest searchRequest,
-          org.elasticsearch.action.search.SearchResponse esResponse) throws 
InvalidSearchException {
-
-    SearchResponse searchResponse = new SearchResponse();
-
-    searchResponse.setTotal(esResponse.getHits().getTotalHits());
-
-    // search hits --> search results
-    List<SearchResult> results = new ArrayList<>();
-    for(SearchHit hit: esResponse.getHits().getHits()) {
-      results.add(getSearchResult(hit, searchRequest.getFields()));
-    }
-    searchResponse.setResults(results);
-
-    // handle facet fields
-    if (searchRequest.getFacetFields().isPresent()) {
-      List<String> facetFields = searchRequest.getFacetFields().get();
-      Map<String, FieldType> commonColumnMetadata;
-      try {
-        commonColumnMetadata = getColumnMetadata(searchRequest.getIndices());
-      } catch (IOException e) {
-        throw new InvalidSearchException(String.format(
-                "Could not get common column metadata for indices %s",
-                Arrays.toString(searchRequest.getIndices().toArray())));
-      }
-      searchResponse.setFacetCounts(getFacetCounts(facetFields, 
esResponse.getAggregations(), commonColumnMetadata ));
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Built search response; response={}", 
ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
-    }
-    return searchResponse;
-  }
-
-  @Override
-  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
-    return group(groupRequest, new 
QueryStringQueryBuilder(groupRequest.getQuery()));
-  }
-
-  /**
-   * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} 
for the query.
-   * @param groupRequest The request defining the parameters of the grouping
-   * @param queryBuilder The actual query to be run. Intended for if the 
SearchRequest requires wrapping
-   * @return The results of the query
-   * @throws InvalidSearchException When the query is malformed or the current 
state doesn't allow search
-   */
-  protected GroupResponse group(GroupRequest groupRequest, QueryBuilder 
queryBuilder)
-      throws InvalidSearchException {
-    org.elasticsearch.action.search.SearchRequest esRequest;
-    org.elasticsearch.action.search.SearchResponse esResponse;
-
-    if (client == null) {
-      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
-    }
-    if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 
0) {
-      throw new InvalidSearchException("At least 1 group must be provided.");
-    }
-
-    esRequest = buildGroupRequest(groupRequest, queryBuilder);
-    esResponse = requestSubmitter.submitSearch(esRequest);
-    GroupResponse response = buildGroupResponse(groupRequest, esResponse);
-
-    return response;
-  }
-
-  /**
-   * Builds a group search request.
-   * @param groupRequest The Metron group request.
-   * @param queryBuilder The search query.
-   * @return An Elasticsearch search request.
-   */
-  private org.elasticsearch.action.search.SearchRequest buildGroupRequest(
-          GroupRequest groupRequest,
-          QueryBuilder queryBuilder) {
-
-    // handle groups
-    TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0);
-    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
-            .query(queryBuilder)
-            .aggregation(groups);
-
-    // return the search request
-    String[] indices = wildcardIndices(groupRequest.getIndices());
-    return new org.elasticsearch.action.search.SearchRequest()
-            .indices(indices)
-            .source(searchSourceBuilder);
-  }
-
-  /**
-   * Build a group response.
-   * @param groupRequest The original group request.
-   * @param response The search response.
-   * @return A group response.
-   * @throws InvalidSearchException
-   */
-  private GroupResponse buildGroupResponse(
-          GroupRequest groupRequest,
-          org.elasticsearch.action.search.SearchResponse response) throws 
InvalidSearchException {
-
-    // build the search response
-    Map<String, FieldType> commonColumnMetadata;
-    try {
-      commonColumnMetadata = getColumnMetadata(groupRequest.getIndices());
-    } catch (IOException e) {
-      throw new InvalidSearchException(String.format("Could not get common 
column metadata for indices %s",
-              Arrays.toString(groupRequest.getIndices().toArray())));
-    }
-
-    GroupResponse groupResponse = new GroupResponse();
-    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
-    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, 
response.getAggregations(), commonColumnMetadata));
-    return groupResponse;
+  public ElasticsearchDao columnMetadataDao(ElasticsearchColumnMetadataDao 
columnMetadataDao) {
+    this.columnMetadataDao = columnMetadataDao;
+    return this;
   }
 
-  private String[] wildcardIndices(List<String> indices) {
-    if(indices == null)
-      return new String[] {};
-
-    return indices
-            .stream()
-            .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
-            .toArray(value -> new String[indices.size()]);
+  public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
+    this.accessConfig = accessConfig;
+    return this;
   }
 
   @Override
@@ -398,6 +94,8 @@ public class ElasticsearchDao implements IndexDao {
       this.accessConfig = config;
       this.columnMetadataDao = new 
ElasticsearchColumnMetadataDao(this.client.admin());
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
+      this.searchDao = new ElasticsearchSearchDao(client, accessConfig, 
columnMetadataDao, requestSubmitter);
+      this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, 
searchDao);
     }
 
     if(columnMetadataDao == null) {
@@ -410,317 +108,54 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
-  public Document getLatest(final String guid, final String sensorType) throws 
IOException {
-    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> 
toDocument(guid, hit));
-    return doc.orElse(null);
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    return this.searchDao.search(searchRequest);
   }
 
-  private Optional<Document> toDocument(final String guid, SearchHit hit) {
-    Long ts = 0L;
-    String doc = hit.getSourceAsString();
-    String sourceType = toSourceType(hit.getType());
-    try {
-      return Optional.of(new Document(doc, guid, sourceType, ts));
-    } catch (IOException e) {
-      throw new IllegalStateException("Unable to retrieve latest: " + 
e.getMessage(), e);
-    }
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    return this.searchDao.group(groupRequest);
   }
 
-  /**
-   * Returns the source type based on a given doc type.
-   * @param docType The document type.
-   * @return The source type.
-   */
-  private String toSourceType(String docType) {
-    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+  @Override
+  public Document getLatest(final String guid, final String sensorType) throws 
IOException {
+    return searchDao.getLatest(guid, sensorType);
   }
 
   @Override
   public Iterable<Document> getAllLatest(
       final List<GetRequest> getRequests) throws IOException {
-    Collection<String> guids = new HashSet<>();
-    Collection<String> sensorTypes = new HashSet<>();
-    for (GetRequest getRequest: getRequests) {
-      guids.add(getRequest.getGuid());
-      sensorTypes.add(getRequest.getSensorType());
-    }
-    List<Document> documents = searchByGuids(
-        guids
-        , sensorTypes
-        , hit -> {
-          Long ts = 0L;
-          String doc = hit.getSourceAsString();
-          String sourceType = 
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-          try {
-            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
-          } catch (IOException e) {
-            throw new IllegalStateException("Unable to retrieve latest: " + 
e.getMessage(), e);
-          }
-        }
-
-    );
-    return documents;
-  }
-
-  <T> Optional<T> searchByGuid(String guid, String sensorType,
-      Function<SearchHit, Optional<T>> callback) {
-    Collection<String> sensorTypes = sensorType != null ? 
Collections.singleton(sensorType) : null;
-    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, 
callback);
-    if (results.size() > 0) {
-      return Optional.of(results.get(0));
-    } else {
-      return Optional.empty();
-    }
-  }
-
-  /**
-   * Return the search hit based on the UUID and sensor type.
-   * A callback can be specified to transform the hit into a type T.
-   * If more than one hit happens, the first one will be returned.
-   */
-  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> 
sensorTypes,
-      Function<SearchHit, Optional<T>> callback) {
-    if(guids == null || guids.isEmpty()) {
-      return Collections.EMPTY_LIST;
-    }
-    QueryBuilder query = null;
-    IdsQueryBuilder idsQuery = null;
-    if (sensorTypes != null) {
-      String[] types = sensorTypes.stream().map(sensorType -> sensorType + 
"_doc").toArray(String[]::new);
-      idsQuery = QueryBuilders.idsQuery(types);
-    } else {
-      idsQuery = QueryBuilders.idsQuery();
-    }
-
-    for(String guid : guids) {
-        query = idsQuery.addIds(guid);
-    }
-
-    SearchRequestBuilder request = client.prepareSearch()
-                                         .setQuery(query)
-                                         .setSize(guids.size())
-                                         ;
-    org.elasticsearch.action.search.SearchResponse response = request.get();
-    SearchHits hits = response.getHits();
-    List<T> results = new ArrayList<>();
-    for (SearchHit hit : hits) {
-      Optional<T> result = callback.apply(hit);
-      if (result.isPresent()) {
-        results.add(result.get());
-      }
-    }
-    return results;
+    return searchDao.getAllLatest(getRequests);
   }
 
   @Override
   public void update(Document update, Optional<String> index) throws 
IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-    String sensorType = update.getSensorType();
-    String indexName = getIndexName(update, index, indexPostfix);
-
-    IndexRequest indexRequest = buildIndexRequest(update, sensorType, 
indexName);
-    try {
-      IndexResponse response = client.index(indexRequest).get();
-
-      ShardInfo shardInfo = response.getShardInfo();
-      int failed = shardInfo.getFailed();
-      if (failed > 0) {
-        throw new IOException(
-            "ElasticsearchDao index failed: " + 
Arrays.toString(shardInfo.getFailures()));
-      }
-    } catch (Exception e) {
-      throw new IOException(e.getMessage(), e);
-    }
+    updateDao.update(update, index);
   }
 
   @Override
   public void batchUpdate(Map<Document, Optional<String>> updates) throws 
IOException {
-    String indexPostfix = ElasticsearchUtils
-        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
-
-    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
-
-    // Get the indices we'll actually be using for each Document.
-    for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
-      Document update = updateEntry.getKey();
-      String sensorType = update.getSensorType();
-      String indexName = getIndexName(update, updateEntry.getValue(), 
indexPostfix);
-      IndexRequest indexRequest = buildIndexRequest(
-          update,
-          sensorType,
-          indexName
-      );
-
-      bulkRequestBuilder.add(indexRequest);
-    }
-
-    BulkResponse bulkResponse = bulkRequestBuilder.get();
-    if (bulkResponse.hasFailures()) {
-      LOG.error("Bulk Request has failures: {}", 
bulkResponse.buildFailureMessage());
-      throw new IOException(
-          "ElasticsearchDao upsert failed: " + 
bulkResponse.buildFailureMessage());
-    }
-  }
-
-  protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) {
-      return index.orElse(getIndexName(update.getGuid(), 
update.getSensorType())
-                  
.orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, 
null))
-      );
-  }
-
-  protected Optional<String> getIndexName(String guid, String sensorType) {
-    return searchByGuid(guid,
-        sensorType,
-        hit -> Optional.ofNullable(hit.getIndex())
-    );
-  }
-
-  protected IndexRequest buildIndexRequest(Document update, String sensorType, 
String indexName) {
-    String type = sensorType + "_doc";
-    Object ts = update.getTimestamp();
-    IndexRequest indexRequest = new IndexRequest(indexName, type, 
update.getGuid())
-        .source(update.getDocument());
-    if(ts != null) {
-      indexRequest = indexRequest.timestamp(ts.toString());
-    }
-
-    return indexRequest;
+    updateDao.batchUpdate(updates);
   }
 
   @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException {
-    return columnMetadataDao.getColumnMetadata(indices);
-  }
-
-  private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
-      org.apache.metron.indexing.dao.search.SortOrder sortOrder) {
-    return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ?
-        org.elasticsearch.search.sort.SortOrder.DESC : 
org.elasticsearch.search.sort.SortOrder.ASC;
-  }
-
-  private Order getElasticsearchGroupOrder(GroupOrder groupOrder) {
-    if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) {
-      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : 
Order.term(false);
-    } else {
-      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : 
Order.count(false);
-    }
-  }
-
-  public Map<String, Map<String, Long>> getFacetCounts(List<String> fields, 
Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
-    Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
-    for (String field: fields) {
-      Map<String, Long> valueCounts = new HashMap<>();
-      Aggregation aggregation = 
aggregations.get(getFacetAggregationName(field));
-      if (aggregation instanceof Terms) {
-        Terms terms = (Terms) aggregation;
-        terms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), 
bucket.getDocCount()));
-      }
-      fieldCounts.put(field, valueCounts);
-    }
-    return fieldCounts;
-  }
-
-  private String formatKey(Object key, FieldType type) {
-    if (FieldType.IP.equals(type) && key instanceof Long) {
-      return LegacyIpFieldMapper.longToIp((Long) key);
-    } else if (FieldType.BOOLEAN.equals(type)) {
-      return (Long) key == 1 ? "true" : "false";
-    } else {
-      return key.toString();
-    }
-  }
-
-  private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest 
groupRequest, int index) {
-    List<Group> groups = groupRequest.getGroups();
-    Group group = groups.get(index);
-    String aggregationName = getGroupByAggregationName(group.getField());
-    TermsAggregationBuilder termsBuilder = 
AggregationBuilders.terms(aggregationName);
-    termsBuilder
-        .field(group.getField())
-        .size(accessConfig.getMaxSearchGroups())
-        .order(getElasticsearchGroupOrder(group.getOrder()));
-    if (index < groups.size() - 1) {
-      termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 
1));
-    }
-    Optional<String> scoreField = groupRequest.getScoreField();
-    if (scoreField.isPresent()) {
-      SumAggregationBuilder scoreSumAggregationBuilder = 
AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0);
-      termsBuilder.subAggregation(scoreSumAggregationBuilder);
-    }
-    return termsBuilder;
+    return this.columnMetadataDao.getColumnMetadata(indices);
   }
 
-  private List<GroupResult> getGroupResults(GroupRequest groupRequest, int 
index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
-    List<Group> groups = groupRequest.getGroups();
-    String field = groups.get(index).getField();
-    Terms terms = aggregations.get(getGroupByAggregationName(field));
-    List<GroupResult> searchResultGroups = new ArrayList<>();
-    for(Bucket bucket: terms.getBuckets()) {
-      GroupResult groupResult = new GroupResult();
-      groupResult.setKey(formatKey(bucket.getKey(), 
commonColumnMetadata.get(field)));
-      groupResult.setTotal(bucket.getDocCount());
-      Optional<String> scoreField = groupRequest.getScoreField();
-      if (scoreField.isPresent()) {
-        Sum score = 
bucket.getAggregations().get(getSumAggregationName(scoreField.get()));
-        groupResult.setScore(score.getValue());
-      }
-      if (index < groups.size() - 1) {
-        groupResult.setGroupedBy(groups.get(index + 1).getField());
-        groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, 
bucket.getAggregations(), commonColumnMetadata));
-      }
-      searchResultGroups.add(groupResult);
-    }
-    return searchResultGroups;
+  protected Optional<String> getIndexName(String guid, String sensorType) {
+    return updateDao.getIndexName(guid, sensorType);
   }
 
-  private SearchResult getSearchResult(SearchHit searchHit, 
Optional<List<String>> fields) {
-    SearchResult searchResult = new SearchResult();
-    searchResult.setId(searchHit.getId());
-    Map<String, Object> source;
-    if (fields.isPresent()) {
-      Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap();
-      source = new HashMap<>();
-      fields.get().forEach(field -> {
-        source.put(field, resultSourceAsMap.get(field));
-      });
-    } else {
-      source = searchHit.getSource();
-    }
-    searchResult.setSource(source);
-    searchResult.setScore(searchHit.getScore());
-    searchResult.setIndex(searchHit.getIndex());
-    return searchResult;
+  protected SearchResponse search(SearchRequest request, QueryBuilder 
queryBuilder) throws InvalidSearchException {
+    return searchDao.search(request, queryBuilder);
   }
 
-  private String getFacetAggregationName(String field) {
-    return String.format("%s_count", field);
+  protected GroupResponse group(GroupRequest groupRequest, QueryBuilder 
queryBuilder) throws InvalidSearchException {
+    return searchDao.group(groupRequest, queryBuilder);
   }
 
   public TransportClient getClient() {
-    return client;
-  }
-
-  private String getGroupByAggregationName(String field) {
-    return String.format("%s_group", field);
-  }
-
-  private String getSumAggregationName(String field) {
-    return String.format("%s_score", field);
-  }
-
-  public ElasticsearchDao client(TransportClient client) {
-    this.client = client;
-    return this;
-  }
-
-  public ElasticsearchDao columnMetadataDao(ColumnMetadataDao 
columnMetadataDao) {
-    this.columnMetadataDao = columnMetadataDao;
-    return this;
-  }
-
-  public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
-    this.accessConfig = accessConfig;
-    return this;
+    return this.client;
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
new file mode 100644
index 0000000..5e9ed02
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.dao;
+
+import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupOrder;
+import org.apache.metron.indexing.dao.search.GroupOrderType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchDao;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
+import org.elasticsearch.index.query.IdsQueryBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
+import org.elasticsearch.search.aggregations.Aggregations;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
+import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
+import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.Sum;
+import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchSearchDao implements SearchDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_LAST = "_last";
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_FIRST = "_first";
+
+  private transient TransportClient client;
+  private AccessConfig accessConfig;
+  private ElasticsearchColumnMetadataDao columnMetadataDao;
+  private ElasticsearchRequestSubmitter requestSubmitter;
+
+  public ElasticsearchSearchDao(TransportClient client,
+      AccessConfig accessConfig,
+      ElasticsearchColumnMetadataDao columnMetadataDao,
+      ElasticsearchRequestSubmitter requestSubmitter) {
+    this.client = client;
+    this.accessConfig = accessConfig;
+    this.columnMetadataDao = columnMetadataDao;
+    this.requestSubmitter = requestSubmitter;
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    if(searchRequest.getQuery() == null) {
+      throw new InvalidSearchException("Search query is invalid: null");
+    }
+    return search(searchRequest, new 
QueryStringQueryBuilder(searchRequest.getQuery()));
+  }
+
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    return group(groupRequest, new 
QueryStringQueryBuilder(groupRequest.getQuery()));
+  }
+
+  @Override
+  public Document getLatest(String guid, String sensorType) throws IOException 
{
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> 
toDocument(guid, hit));
+    return doc.orElse(null);
+  }
+
+  <T> Optional<T> searchByGuid(String guid, String sensorType,
+      Function<SearchHit, Optional<T>> callback) {
+    Collection<String> sensorTypes = sensorType != null ? 
Collections.singleton(sensorType) : null;
+    List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, 
callback);
+    if (results.size() > 0) {
+      return Optional.of(results.get(0));
+    } else {
+      return Optional.empty();
+    }
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
+    Collection<String> guids = new HashSet<>();
+    Collection<String> sensorTypes = new HashSet<>();
+    for (GetRequest getRequest: getRequests) {
+      guids.add(getRequest.getGuid());
+      sensorTypes.add(getRequest.getSensorType());
+    }
+    List<Document> documents = searchByGuids(
+        guids
+        , sensorTypes
+        , hit -> {
+          Long ts = 0L;
+          String doc = hit.getSourceAsString();
+          String sourceType = 
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+          try {
+            return Optional.of(new Document(doc, hit.getId(), sourceType, ts));
+          } catch (IOException e) {
+            throw new IllegalStateException("Unable to retrieve latest: " + 
e.getMessage(), e);
+          }
+        }
+
+    );
+    return documents;
+  }
+
+  /**
+   * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} 
for the query.
+   * @param request The request defining the parameters of the search
+   * @param queryBuilder The actual query to be run. Intended for if the 
SearchRequest requires wrapping
+   * @return The results of the query
+   * @throws InvalidSearchException When the query is malformed or the current 
state doesn't allow search
+   */
+  protected SearchResponse search(SearchRequest request, QueryBuilder 
queryBuilder) throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
+    if(client == null) {
+      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
+    }
+
+    if (request.getSize() > accessConfig.getMaxSearchResults()) {
+      throw new InvalidSearchException("Search result size must be less than " 
+ accessConfig.getMaxSearchResults());
+    }
+
+    esRequest = buildSearchRequest(request, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    return buildSearchResponse(request, esResponse);
+  }
+
+  /**
+   * Builds an Elasticsearch search request.
+   * @param searchRequest The Metron search request.
+   * @param queryBuilder
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
+      SearchRequest searchRequest,
+      QueryBuilder queryBuilder) throws InvalidSearchException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got search request; request={}", 
ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    }
+    SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
+        .size(searchRequest.getSize())
+        .from(searchRequest.getFrom())
+        .query(queryBuilder)
+        .trackScores(true);
+    Optional<List<String>> fields = searchRequest.getFields();
+    // column metadata needed to understand the type of each sort field
+    Map<String, FieldType> meta;
+    try {
+      meta = columnMetadataDao.getColumnMetadata(searchRequest.getIndices());
+    } catch(IOException e) {
+      throw new InvalidSearchException("Unable to get column metadata", e);
+    }
+
+    // handle sort fields
+    for(SortField sortField : searchRequest.getSort()) {
+
+      // what type is the sort field?
+      FieldType sortFieldType = meta.getOrDefault(sortField.getField(), 
FieldType.OTHER);
+
+      // sort order - if ascending missing values sorted last. otherwise, 
missing values sorted first
+      org.elasticsearch.search.sort.SortOrder sortOrder = 
getElasticsearchSortOrder(sortField.getSortOrder());
+      String missingSortOrder;
+      if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) {
+        missingSortOrder = SORT_MISSING_LAST;
+      } else {
+        missingSortOrder = SORT_MISSING_FIRST;
+      }
+
+      // sort by the field - missing fields always last
+      FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField())
+          .order(sortOrder)
+          .missing(missingSortOrder)
+          .unmappedType(sortFieldType.getFieldType());
+      searchBuilder.sort(sortBy);
+    }
+
+    // handle search fields
+    if (fields.isPresent()) {
+      searchBuilder.fetchSource("*", null);
+    } else {
+      searchBuilder.fetchSource(true);
+    }
+
+    Optional<List<String>> facetFields = searchRequest.getFacetFields();
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      // 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html
+      for(String field : searchRequest.getFacetFields().get()) {
+        String name = getFacetAggregationName(field);
+        TermsAggregationBuilder terms = AggregationBuilders.terms( 
name).field(field);
+        // new TermsBuilder(name).field(field);
+        searchBuilder.aggregation(terms);
+      }
+    }
+
+    // return the search request
+    String[] indices = wildcardIndices(searchRequest.getIndices());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built Elasticsearch request; indices={}, request={}", 
indices, searchBuilder.toString());
+    }
+    return new org.elasticsearch.action.search.SearchRequest()
+        .indices(indices)
+        .source(searchBuilder);
+  }
+
+  /**
+   * Builds a search response.
+   *
+   * This effectively transforms an Elasticsearch search response into a 
Metron search response.
+   *
+   * @param searchRequest The Metron search request.
+   * @param esResponse The Elasticsearch search response.
+   * @return A Metron search response.
+   * @throws InvalidSearchException
+   */
+  private SearchResponse buildSearchResponse(
+      SearchRequest searchRequest,
+      org.elasticsearch.action.search.SearchResponse esResponse) throws 
InvalidSearchException {
+
+    SearchResponse searchResponse = new SearchResponse();
+
+    searchResponse.setTotal(esResponse.getHits().getTotalHits());
+
+    // search hits --> search results
+    List<SearchResult> results = new ArrayList<>();
+    for(SearchHit hit: esResponse.getHits().getHits()) {
+      results.add(getSearchResult(hit, searchRequest.getFields()));
+    }
+    searchResponse.setResults(results);
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      List<String> facetFields = searchRequest.getFacetFields().get();
+      Map<String, FieldType> commonColumnMetadata;
+      try {
+        commonColumnMetadata = 
columnMetadataDao.getColumnMetadata(searchRequest.getIndices());
+      } catch (IOException e) {
+        throw new InvalidSearchException(String.format(
+            "Could not get common column metadata for indices %s",
+            Arrays.toString(searchRequest.getIndices().toArray())));
+      }
+      searchResponse.setFacetCounts(getFacetCounts(facetFields, 
esResponse.getAggregations(), commonColumnMetadata ));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built search response; response={}", 
ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
+    }
+    return searchResponse;
+  }
+
+  private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
+      org.apache.metron.indexing.dao.search.SortOrder sortOrder) {
+    return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ?
+        org.elasticsearch.search.sort.SortOrder.DESC : 
org.elasticsearch.search.sort.SortOrder.ASC;
+  }
+
+  private String getFacetAggregationName(String field) {
+    return String.format("%s_count", field);
+  }
+
+  private String[] wildcardIndices(List<String> indices) {
+    if(indices == null)
+      return new String[] {};
+
+    return indices
+        .stream()
+        .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
+        .toArray(value -> new String[indices.size()]);
+  }
+
+  private SearchResult getSearchResult(SearchHit searchHit, 
Optional<List<String>> fields) {
+    SearchResult searchResult = new SearchResult();
+    searchResult.setId(searchHit.getId());
+    Map<String, Object> source;
+    if (fields.isPresent()) {
+      Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap();
+      source = new HashMap<>();
+      fields.get().forEach(field -> {
+        source.put(field, resultSourceAsMap.get(field));
+      });
+    } else {
+      source = searchHit.getSource();
+    }
+    searchResult.setSource(source);
+    searchResult.setScore(searchHit.getScore());
+    searchResult.setIndex(searchHit.getIndex());
+    return searchResult;
+  }
+
+  private Map<String, Map<String, Long>> getFacetCounts(List<String> fields, 
Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
+    Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
+    for (String field: fields) {
+      Map<String, Long> valueCounts = new HashMap<>();
+      Aggregation aggregation = 
aggregations.get(getFacetAggregationName(field));
+      if (aggregation instanceof Terms) {
+        Terms terms = (Terms) aggregation;
+        terms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), 
bucket.getDocCount()));
+      }
+      fieldCounts.put(field, valueCounts);
+    }
+    return fieldCounts;
+  }
+
+  private String formatKey(Object key, FieldType type) {
+    if (FieldType.IP.equals(type) && key instanceof Long) {
+      return LegacyIpFieldMapper.longToIp((Long) key);
+    } else if (FieldType.BOOLEAN.equals(type)) {
+      return (Long) key == 1 ? "true" : "false";
+    } else {
+      return key.toString();
+    }
+  }
+
+  /**
+   * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} 
for the query.
+   * @param groupRequest The request defining the parameters of the grouping
+   * @param queryBuilder The actual query to be run. Intended for if the 
SearchRequest requires wrapping
+   * @return The results of the query
+   * @throws InvalidSearchException When the query is malformed or the current 
state doesn't allow search
+   */
+  protected GroupResponse group(GroupRequest groupRequest, QueryBuilder 
queryBuilder)
+      throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
+    if (client == null) {
+      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
+    }
+    if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 
0) {
+      throw new InvalidSearchException("At least 1 group must be provided.");
+    }
+
+    esRequest = buildGroupRequest(groupRequest, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    GroupResponse response = buildGroupResponse(groupRequest, esResponse);
+
+    return response;
+  }
+
+  /**
+   * Builds a group search request.
+   * @param groupRequest The Metron group request.
+   * @param queryBuilder The search query.
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildGroupRequest(
+      GroupRequest groupRequest,
+      QueryBuilder queryBuilder) {
+
+    // handle groups
+    TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+        .query(queryBuilder)
+        .aggregation(groups);
+
+    // return the search request
+    String[] indices = wildcardIndices(groupRequest.getIndices());
+    return new org.elasticsearch.action.search.SearchRequest()
+        .indices(indices)
+        .source(searchSourceBuilder);
+  }
+
+  private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest 
groupRequest, int index) {
+    List<Group> groups = groupRequest.getGroups();
+    Group group = groups.get(index);
+    String aggregationName = getGroupByAggregationName(group.getField());
+    TermsAggregationBuilder termsBuilder = 
AggregationBuilders.terms(aggregationName);
+    termsBuilder
+        .field(group.getField())
+        .size(accessConfig.getMaxSearchGroups())
+        .order(getElasticsearchGroupOrder(group.getOrder()));
+    if (index < groups.size() - 1) {
+      termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 
1));
+    }
+    Optional<String> scoreField = groupRequest.getScoreField();
+    if (scoreField.isPresent()) {
+      SumAggregationBuilder scoreSumAggregationBuilder = 
AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0);
+      termsBuilder.subAggregation(scoreSumAggregationBuilder);
+    }
+    return termsBuilder;
+  }
+
+  private String getGroupByAggregationName(String field) {
+    return String.format("%s_group", field);
+  }
+
+  private String getSumAggregationName(String field) {
+    return String.format("%s_score", field);
+  }
+
+  private Order getElasticsearchGroupOrder(GroupOrder groupOrder) {
+    if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) {
+      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : 
Order.term(false);
+    } else {
+      return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : 
Order.count(false);
+    }
+  }
+
+  /**
+   * Build a group response.
+   * @param groupRequest The original group request.
+   * @param response The search response.
+   * @return A group response.
+   * @throws InvalidSearchException
+   */
+  private GroupResponse buildGroupResponse(
+      GroupRequest groupRequest,
+      org.elasticsearch.action.search.SearchResponse response) throws 
InvalidSearchException {
+
+    // build the search response
+    Map<String, FieldType> commonColumnMetadata;
+    try {
+      commonColumnMetadata = 
columnMetadataDao.getColumnMetadata(groupRequest.getIndices());
+    } catch (IOException e) {
+      throw new InvalidSearchException(String.format("Could not get common 
column metadata for indices %s",
+          Arrays.toString(groupRequest.getIndices().toArray())));
+    }
+
+    GroupResponse groupResponse = new GroupResponse();
+    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, 
response.getAggregations(), commonColumnMetadata));
+    return groupResponse;
+  }
+
+  private List<GroupResult> getGroupResults(GroupRequest groupRequest, int 
index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) {
+    List<Group> groups = groupRequest.getGroups();
+    String field = groups.get(index).getField();
+    Terms terms = aggregations.get(getGroupByAggregationName(field));
+    List<GroupResult> searchResultGroups = new ArrayList<>();
+    for(Bucket bucket: terms.getBuckets()) {
+      GroupResult groupResult = new GroupResult();
+      groupResult.setKey(formatKey(bucket.getKey(), 
commonColumnMetadata.get(field)));
+      groupResult.setTotal(bucket.getDocCount());
+      Optional<String> scoreField = groupRequest.getScoreField();
+      if (scoreField.isPresent()) {
+        Sum score = 
bucket.getAggregations().get(getSumAggregationName(scoreField.get()));
+        groupResult.setScore(score.getValue());
+      }
+      if (index < groups.size() - 1) {
+        groupResult.setGroupedBy(groups.get(index + 1).getField());
+        groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, 
bucket.getAggregations(), commonColumnMetadata));
+      }
+      searchResultGroups.add(groupResult);
+    }
+    return searchResultGroups;
+  }
+
+  /**
+   * Return the search hit based on the UUID and sensor type.
+   * A callback can be specified to transform the hit into a type T.
+   * If more than one hit happens, the first one will be returned.
+   */
+  <T> List<T> searchByGuids(Collection<String> guids, Collection<String> 
sensorTypes,
+      Function<SearchHit, Optional<T>> callback) {
+    if(guids == null || guids.isEmpty()) {
+      return Collections.EMPTY_LIST;
+    }
+    QueryBuilder query = null;
+    IdsQueryBuilder idsQuery = null;
+    if (sensorTypes != null) {
+      String[] types = sensorTypes.stream().map(sensorType -> sensorType + 
"_doc").toArray(String[]::new);
+      idsQuery = QueryBuilders.idsQuery(types);
+    } else {
+      idsQuery = QueryBuilders.idsQuery();
+    }
+
+    for(String guid : guids) {
+      query = idsQuery.addIds(guid);
+    }
+
+    SearchRequestBuilder request = client.prepareSearch()
+        .setQuery(query)
+        .setSize(guids.size())
+        ;
+    org.elasticsearch.action.search.SearchResponse response = request.get();
+    SearchHits hits = response.getHits();
+    List<T> results = new ArrayList<>();
+    for (SearchHit hit : hits) {
+      Optional<T> result = callback.apply(hit);
+      if (result.isPresent()) {
+        results.add(result.get());
+      }
+    }
+    return results;
+  }
+
+  private Optional<Document> toDocument(final String guid, SearchHit hit) {
+    Long ts = 0L;
+    String doc = hit.getSourceAsString();
+    String sourceType = toSourceType(hit.getType());
+    try {
+      return Optional.of(new Document(doc, guid, sourceType, ts));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve latest: " + 
e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Returns the source type based on a given doc type.
+   * @param docType The document type.
+   * @return The source type.
+   */
+  private String toSourceType(String docType) {
+    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
new file mode 100644
index 0000000..a7c3a71
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.dao;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.UpdateDao;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.index.IndexResponse;
+import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.client.transport.TransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ElasticsearchUpdateDao implements UpdateDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private transient TransportClient client;
+  private AccessConfig accessConfig;
+  private ElasticsearchSearchDao searchDao;
+
+  public ElasticsearchUpdateDao(TransportClient client,
+      AccessConfig accessConfig,
+      ElasticsearchSearchDao searchDao) {
+    this.client = client;
+    this.accessConfig = accessConfig;
+    this.searchDao = searchDao;
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws 
IOException {
+    String indexPostfix = ElasticsearchUtils
+        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
+    String sensorType = update.getSensorType();
+    String indexName = getIndexName(update, index, indexPostfix);
+
+    IndexRequest indexRequest = buildIndexRequest(update, sensorType, 
indexName);
+    try {
+      IndexResponse response = client.index(indexRequest).get();
+
+      ShardInfo shardInfo = response.getShardInfo();
+      int failed = shardInfo.getFailed();
+      if (failed > 0) {
+        throw new IOException(
+            "ElasticsearchDao index failed: " + 
Arrays.toString(shardInfo.getFailures()));
+      }
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+  }
+
+  @Override
+  public void batchUpdate(Map<Document, Optional<String>> updates) throws 
IOException {
+    String indexPostfix = ElasticsearchUtils
+        
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
+
+    BulkRequestBuilder bulkRequestBuilder = client.prepareBulk();
+
+    // Get the indices we'll actually be using for each Document.
+    for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
+      Document update = updateEntry.getKey();
+      String sensorType = update.getSensorType();
+      String indexName = getIndexName(update, updateEntry.getValue(), 
indexPostfix);
+      IndexRequest indexRequest = buildIndexRequest(
+          update,
+          sensorType,
+          indexName
+      );
+
+      bulkRequestBuilder.add(indexRequest);
+    }
+
+    BulkResponse bulkResponse = bulkRequestBuilder.get();
+    if (bulkResponse.hasFailures()) {
+      LOG.error("Bulk Request has failures: {}", 
bulkResponse.buildFailureMessage());
+      throw new IOException(
+          "ElasticsearchDao upsert failed: " + 
bulkResponse.buildFailureMessage());
+    }
+  }
+
+  protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) {
+    return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
+        .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), 
indexPostFix, null))
+    );
+  }
+
+  protected Optional<String> getIndexName(String guid, String sensorType) {
+    return searchDao.searchByGuid(guid,
+        sensorType,
+        hit -> Optional.ofNullable(hit.getIndex())
+    );
+  }
+
+  protected IndexRequest buildIndexRequest(Document update, String sensorType, 
String indexName) {
+    String type = sensorType + "_doc";
+    Object ts = update.getTimestamp();
+    IndexRequest indexRequest = new IndexRequest(indexName, type, 
update.getGuid())
+        .source(update.getDocument());
+    if(ts != null) {
+      indexRequest = indexRequest.timestamp(ts.toString());
+    }
+
+    return indexRequest;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 2a6fb4f..ca1b860 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -27,6 +27,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -79,7 +80,7 @@ public class ElasticsearchDaoTest {
     when(response.getHits()).thenReturn(searchHits);
 
     // provides column metadata
-    ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class);
+    ElasticsearchColumnMetadataDao columnMetadataDao = 
mock(ElasticsearchColumnMetadataDao.class);
     when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata);
 
     // returns the search response
@@ -92,7 +93,10 @@ public class ElasticsearchDaoTest {
     AccessConfig config = mock(AccessConfig.class);
     when(config.getMaxSearchResults()).thenReturn(maxSearchResults);
 
-    dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, 
config);
+    ElasticsearchSearchDao elasticsearchSearchDao = new 
ElasticsearchSearchDao(client, config, columnMetadataDao, requestSubmitter);
+    ElasticsearchUpdateDao elasticsearchUpdateDao = new 
ElasticsearchUpdateDao(client, config, elasticsearchSearchDao);
+
+    dao = new ElasticsearchDao(client, config, elasticsearchSearchDao, 
elasticsearchUpdateDao, columnMetadataDao, requestSubmitter);
   }
 
   private void setup(RestStatus status, int maxSearchResults) throws Exception 
{

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 1bc5b6e..5569c54 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -19,7 +19,12 @@ package org.apache.metron.elasticsearch.integration;
 
 
 import java.io.File;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
@@ -31,6 +36,14 @@ import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.integration.InMemoryComponent;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
@@ -42,6 +55,13 @@ import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+import org.junit.Assert;
+import org.junit.Test;
+
 public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
 
   private static String indexDir = "target/elasticsearch_search";
@@ -238,5 +258,97 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     }
   }
 
+  @Test
+  public void bad_facet_query_throws_exception() throws Exception {
+    thrown.expect(InvalidSearchException.class);
+    thrown.expectMessage("Failed to execute search");
+    SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, 
SearchRequest.class);
+    dao.search(request);
+  }
+
+
+
+  @Override
+  public void returns_column_metadata_for_specified_indices() throws Exception 
{
+    // getColumnMetadata with only bro
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(13, fieldTypes.size());
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+    }
+    // getColumnMetadata with only snort
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(14, fieldTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+    }
+  }
+
+  @Override
+  public void returns_column_data_for_multiple_indices() throws Exception {
+    Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+    Assert.assertEquals(15, fieldTypes.size());
+    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+    Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+    Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+    Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+    Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+    Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+    Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+    //NOTE: This is because the field is in both bro and snort and they have 
different types.
+    Assert.assertEquals(FieldType.OTHER, 
fieldTypes.get("duplicate_name_field"));
+    Assert.assertEquals(FieldType.FLOAT, 
fieldTypes.get("threat:triage:score"));
+    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
+  }
 
+  @Test
+  public void 
throws_exception_on_aggregation_queries_on_non_string_non_numeric_fields()
+      throws Exception {
+    thrown.expect(InvalidSearchException.class);
+    thrown.expectMessage("Failed to execute search");
+    GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, 
GroupRequest.class);
+    dao.group(request);
+  }
+
+  @Test
+  public void different_type_filter_query() throws Exception {
+    SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, 
SearchRequest.class);
+    SearchResponse response = dao.search(request);
+    Assert.assertEquals(1, response.getTotal());
+    List<SearchResult> results = response.getResults();
+    Assert.assertEquals("bro", results.get(0).getSource().get("source:type"));
+    Assert.assertEquals("data 1", 
results.get(0).getSource().get("duplicate_name_field"));
+  }
 }

Reply via email to