METRON-1301 Alerts UI - Sorting on Triage Score Unexpectedly Filters Some 
Records (nickwallen) closes apache/metron#832


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4a089900
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4a089900
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4a089900

Branch: refs/heads/master
Commit: 4a089900ad477b8e261ab2a2a7d4cafeaec21eca
Parents: 768a6fa
Author: nickwallen <[email protected]>
Authored: Fri Nov 17 14:45:27 2017 -0500
Committer: nickallen <[email protected]>
Committed: Fri Nov 17 14:45:27 2017 -0500

----------------------------------------------------------------------
 .../CURRENT/package/files/bro_index.template    |   8 +-
 .../CURRENT/package/files/snort_index.template  |  39 +-
 .../CURRENT/package/files/yaf_index.template    |   6 +-
 .../apache/metron/rest/config/IndexConfig.java  |   6 +
 .../SearchControllerIntegrationTest.java        |  32 +-
 .../elasticsearch/dao/ColumnMetadataDao.java    |  67 +++
 .../dao/ElasticsearchColumnMetadataDao.java     | 179 ++++++++
 .../elasticsearch/dao/ElasticsearchDao.java     | 426 +++++++++++--------
 .../dao/ElasticsearchRequestSubmitter.java      | 138 ++++++
 .../elasticsearch/utils/ElasticsearchUtils.java |  52 +++
 .../dao/ElasticsearchColumnMetadataDaoTest.java | 144 +++++++
 .../elasticsearch/dao/ElasticsearchDaoTest.java | 240 +++++++----
 .../dao/ElasticsearchRequestSubmitterTest.java  | 121 ++++++
 .../ElasticsearchSearchIntegrationTest.java     | 154 ++++---
 .../matcher/SearchRequestMatcher.java           |  93 ----
 .../metron/indexing/dao/AccessConfig.java       |  16 +-
 .../apache/metron/indexing/dao/InMemoryDao.java |  31 +-
 .../indexing/dao/SearchIntegrationTest.java     | 154 ++++++-
 18 files changed, 1449 insertions(+), 457 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
index 7db006e..3a68d75 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template
@@ -98,7 +98,7 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
@@ -107,7 +107,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:reason",
+          "match": "threat:triage:rules:*:reason",
           "match_mapping_type": "*"
         }
       },
@@ -116,9 +116,9 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:name",
+          "match": "threat:triage:rules:*:name",
           "match_mapping_type": "*"
-        }
+      }
       }
       ],
       "properties": {

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
index f13a9ee..7c6b401 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template
@@ -98,28 +98,28 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
-        {
-          "threat_triage_reason": {
-            "mapping": {
-              "type": "string"
-            },
-            "match": "threat.triage.rules:*:reason",
-            "match_mapping_type": "*"
-          }
-        },
-        {
-          "threat_triage_name": {
-            "mapping": {
-              "type": "string"
-            },
-            "match": "threat.triage.rules:*:name",
-            "match_mapping_type": "*"
-          }
+      {
+        "threat_triage_reason": {
+          "mapping": {
+            "type": "string"
+          },
+          "match": "threat:triage:rules:*:reason",
+          "match_mapping_type": "*"
         }
+      },
+      {
+        "threat_triage_name": {
+          "mapping": {
+            "type": "string"
+          },
+          "match": "threat:triage:rules:*:name",
+          "match_mapping_type": "*"
+        }
+      }
       ],
       "properties": {
         "timestamp": {
@@ -195,9 +195,6 @@
         "tcpwindow": {
           "type": "string"
         },
-        "threat:triage:level": {
-          "type": "double"
-        },
         "tos": {
           "type": "integer"
         },

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
----------------------------------------------------------------------
diff --git 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
index d84235d..d100eb0 100644
--- 
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
+++ 
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template
@@ -98,7 +98,7 @@
           "mapping": {
             "type": "float"
           },
-          "match": "threat.triage.rules:*:score",
+          "match": "threat:triage:*score",
           "match_mapping_type": "*"
         }
       },
@@ -107,7 +107,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:reason",
+          "match": "threat:triage:rules:*:reason",
           "match_mapping_type": "*"
         }
       },
@@ -116,7 +116,7 @@
           "mapping": {
             "type": "string"
           },
-          "match": "threat.triage.rules:*:name",
+          "match": "threat:triage:rules:*:name",
           "match_mapping_type": "*"
         }
       }

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
index 4ce9644..25bb809 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java
@@ -34,6 +34,10 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.core.env.Environment;
 
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
 @Configuration
 public class IndexConfig {
 
@@ -57,6 +61,7 @@ public class IndexConfig {
       int searchMaxGroups = 
environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 
1000);
       String metaDaoImpl = 
environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null);
       String metaDaoSort = 
environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null);
+
       AccessConfig config = new AccessConfig();
       config.setMaxSearchResults(searchMaxResults);
       config.setMaxSearchGroups(searchMaxGroups);
@@ -84,6 +89,7 @@ public class IndexConfig {
       MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, 
config).get(0);
       ret.init(indexDao, Optional.ofNullable(metaDaoSort));
       return ret;
+
     }
     catch(RuntimeException re) {
       throw re;

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
index 3673654..78a1e20 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
@@ -89,8 +89,8 @@ public class SearchControllerIntegrationTest extends 
DaoControllerTest {
   public void setup() throws Exception {
     this.mockMvc = 
MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
     ImmutableMap<String, String> testData = ImmutableMap.of(
-        "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
-        "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
+            "bro_index_2017.01.01.01", SearchIntegrationTest.broData,
+            "snort_index_2017.01.01.01", SearchIntegrationTest.snortData
     );
     loadTestData(testData);
     loadColumnTypes();
@@ -114,19 +114,19 @@ public class SearchControllerIntegrationTest extends 
DaoControllerTest {
     }});
 
     assertEventually(() -> this.mockMvc.perform(post(searchUrl + 
"/search").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(defaultQuery))
-        .andExpect(status().isOk())
-        
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
-        .andExpect(jsonPath("$.total").value(5))
-        .andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
-        .andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
-        .andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
-        .andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
-        .andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
-        .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.total").value(5))
+            
.andExpect(jsonPath("$.results[0].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[0].source.timestamp").value(5))
+            
.andExpect(jsonPath("$.results[1].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[1].source.timestamp").value(4))
+            
.andExpect(jsonPath("$.results[2].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[2].source.timestamp").value(3))
+            
.andExpect(jsonPath("$.results[3].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[3].source.timestamp").value(2))
+            
.andExpect(jsonPath("$.results[4].source.source:type").value("bro"))
+            .andExpect(jsonPath("$.results[4].source.timestamp").value(1))
     );
 
     sensorIndexingConfigService.delete("bro");
@@ -288,4 +288,4 @@ public class SearchControllerIntegrationTest extends 
DaoControllerTest {
     columnTypes.put("snort", snortTypes);
     InMemoryDao.setColumnMetadata(columnTypes);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
new file mode 100644
index 0000000..0393629
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Responsible for retrieving column-level metadata about search indices.
+ */
+public interface ColumnMetadataDao {
+
+  /**
+   * Retrieves column metadata for one or more search indices.
+   * @param indices The search indices to retrieve column metadata for.
+   * @return The column metadata, one set for each search index.
+   * @throws IOException
+   */
+  Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException;
+
+  /**
+   * Finds the latest version of a set of base indices.  This can be used to 
find
+   * the latest 'bro' index, for example.
+   *
+   * Assuming the following indices exist...
+   *
+   *    [
+   *      'bro_index_2017.10.03.19'
+   *      'bro_index_2017.10.03.20',
+   *      'bro_index_2017.10.03.21',
+   *      'snort_index_2017.10.03.19',
+   *      'snort_index_2017.10.03.20',
+   *      'snort_index_2017.10.03.21'
+   *    ]
+   *
+   *  And the include indices are given as...
+   *
+   *    ['bro', 'snort']
+   *
+   * Then the latest indices are...
+   *
+   *    ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21']
+   *
+   * @param includeIndices The base names of the indices to include
+   * @return The latest version of a set of indices.
+   */
+  String[] getLatestIndices(List<String> includeIndices);
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
new file mode 100644
index 0000000..8e210b4
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
+/**
+ * Responsible for retrieving column-level metadata for Elasticsearch search 
indices.
+ */
+public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<String, FieldType> elasticsearchTypeMap;
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("string", FieldType.STRING);
+    fieldTypeMap.put("ip", FieldType.IP);
+    fieldTypeMap.put("integer", FieldType.INTEGER);
+    fieldTypeMap.put("long", FieldType.LONG);
+    fieldTypeMap.put("date", FieldType.DATE);
+    fieldTypeMap.put("float", FieldType.FLOAT);
+    fieldTypeMap.put("double", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
+  /**
+   * An Elasticsearch administrative client.
+   */
+  private transient AdminClient adminClient;
+
+  /**
+   * @param adminClient The Elasticsearch admin client.
+   */
+  public ElasticsearchColumnMetadataDao(AdminClient adminClient) {
+    this.adminClient = adminClient;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException {
+    Map<String, FieldType> indexColumnMetadata = new HashMap<>();
+    Map<String, String> previousIndices = new HashMap<>();
+    Set<String> fieldBlackList = new HashSet<>();
+
+    String[] latestIndices = getLatestIndices(indices);
+    if (latestIndices.length > 0) {
+      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings = adminClient
+              .indices()
+              .getMappings(new GetMappingsRequest().indices(latestIndices))
+              .actionGet()
+              .getMappings();
+
+      // for each index
+      for (Object key : mappings.keys().toArray()) {
+        String indexName = key.toString();
+        ImmutableOpenMap<String, MappingMetaData> mapping = 
mappings.get(indexName);
+
+        // for each mapping in the index
+        Iterator<String> mappingIterator = mapping.keysIt();
+        while (mappingIterator.hasNext()) {
+          MappingMetaData mappingMetaData = 
mapping.get(mappingIterator.next());
+          Map<String, Map<String, String>> map = (Map<String, Map<String, 
String>>) mappingMetaData
+                  .getSourceAsMap().get("properties");
+
+          // for each field in the mapping
+          for (String field : map.keySet()) {
+            if (!fieldBlackList.contains(field)) {
+              FieldType type = toFieldType(map.get(field).get("type"));
+
+              if(!indexColumnMetadata.containsKey(field)) {
+                indexColumnMetadata.put(field, type);
+
+                // record the last index in which a field exists, to be able 
to print helpful error message on type mismatch
+                previousIndices.put(field, indexName);
+
+              } else {
+                FieldType previousType = indexColumnMetadata.get(field);
+                if (!type.equals(previousType)) {
+                  String previousIndexName = previousIndices.get(field);
+                  LOG.error(String.format(
+                          "Field type mismatch: %s.%s has type %s while %s.%s 
has type %s.  Defaulting type to %s.",
+                          indexName, field, type.getFieldType(),
+                          previousIndexName, field, 
previousType.getFieldType(),
+                          FieldType.OTHER.getFieldType()));
+                  indexColumnMetadata.put(field, FieldType.OTHER);
+
+                  // the field is defined in multiple indices with different 
types; ignore the field as type has been set to OTHER
+                  fieldBlackList.add(field);
+                }
+              }
+            }
+          }
+        }
+      }
+    } else {
+      LOG.info(String.format("Unable to find any latest indices; indices=%s", 
indices));
+    }
+
+    return indexColumnMetadata;
+
+  }
+
+  /**
+   * Retrieves the latest indices.
+   * @param includeIndices
+   * @return
+   */
+  @Override
+  public String[] getLatestIndices(List<String> includeIndices) {
+    LOG.debug("Getting latest indices; indices={}", includeIndices);
+    Map<String, String> latestIndices = new HashMap<>();
+    String[] indices = adminClient
+            .indices()
+            .prepareGetIndex()
+            .setFeatures()
+            .get()
+            .getIndices();
+
+    for (String index : indices) {
+      int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
+      if (prefixEnd != -1) {
+        String prefix = index.substring(0, prefixEnd);
+        if (includeIndices.contains(prefix)) {
+          String latestIndex = latestIndices.get(prefix);
+          if (latestIndex == null || index.compareTo(latestIndex) > 0) {
+            latestIndices.put(prefix, index);
+          }
+        }
+      }
+    }
+
+    return latestIndices.values().toArray(new String[latestIndices.size()]);
+  }
+
+  /**
+   * Converts a string type to the corresponding FieldType.
+   * @param type The type to convert.
+   * @return The corresponding FieldType or FieldType.OTHER, if no match.
+   */
+  private FieldType toFieldType(String type) {
+    return elasticsearchTypeMap.getOrDefault(type, FieldType.OTHER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 87ad7f7..910c09b 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,26 +17,8 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
-import java.util.stream.Collectors;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -52,19 +34,16 @@ import 
org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
 import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
-import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
-import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.cluster.metadata.MappingMetaData;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.mapper.ip.IpFieldMapper;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
@@ -80,18 +59,64 @@ import 
org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
 import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+
+import static 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
 
 public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_LAST = "_last";
+
+  /**
+   * The value required to ensure that Elasticsearch sorts missing values last.
+   */
+  private static final String SORT_MISSING_FIRST = "_first";
+
+  /**
+   * The Elasticsearch client.
+   */
   private transient TransportClient client;
+
+  /**
+   * Retrieves column metadata about search indices.
+   */
+  private ColumnMetadataDao columnMetadataDao;
+
+  /**
+   * Handles the submission of search requests to Elasticsearch.
+   */
+  private ElasticsearchRequestSubmitter requestSubmitter;
+
   private AccessConfig accessConfig;
 
-  protected ElasticsearchDao(TransportClient client, AccessConfig config) {
+  protected ElasticsearchDao(TransportClient client,
+                             ColumnMetadataDao columnMetadataDao,
+                             ElasticsearchRequestSubmitter requestSubmitter,
+                             AccessConfig config) {
     this.client = client;
+    this.columnMetadataDao = columnMetadataDao;
+    this.requestSubmitter = requestSubmitter;
     this.accessConfig = config;
   }
 
@@ -99,21 +124,6 @@ public class ElasticsearchDao implements IndexDao {
     //uninitialized.
   }
 
-  private static Map<String, FieldType> elasticsearchSearchTypeMap;
-
-  static {
-    Map<String, FieldType> fieldTypeMap = new HashMap<>();
-    fieldTypeMap.put("string", FieldType.STRING);
-    fieldTypeMap.put("ip", FieldType.IP);
-    fieldTypeMap.put("integer", FieldType.INTEGER);
-    fieldTypeMap.put("long", FieldType.LONG);
-    fieldTypeMap.put("date", FieldType.DATE);
-    fieldTypeMap.put("float", FieldType.FLOAT);
-    fieldTypeMap.put("double", FieldType.DOUBLE);
-    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
-    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
-  }
-
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
     return search(searchRequest, new 
QueryStringQueryBuilder(searchRequest.getQuery()));
@@ -121,56 +131,139 @@ public class ElasticsearchDao implements IndexDao {
 
   /**
    * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} 
for the query.
-   * @param searchRequest The request defining the parameters of the search
+   * @param request The request defining the parameters of the search
    * @param queryBuilder The actual query to be run. Intended for if the 
SearchRequest requires wrapping
    * @return The results of the query
    * @throws InvalidSearchException When the query is malformed or the current 
state doesn't allow search
    */
-  protected SearchResponse search(SearchRequest searchRequest, QueryBuilder 
queryBuilder) throws InvalidSearchException {
+  protected SearchResponse search(SearchRequest request, QueryBuilder 
queryBuilder) throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
     if(client == null) {
       throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
     }
-    if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+
+    if (request.getSize() > accessConfig.getMaxSearchResults()) {
       throw new InvalidSearchException("Search result size must be less than " 
+ accessConfig.getMaxSearchResults());
     }
-    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+
+    esRequest = buildSearchRequest(request, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    return buildSearchResponse(request, esResponse);
+  }
+
+  /**
+   * Builds an Elasticsearch search request.
+   * @param searchRequest The Metron search request.
+   * @param queryBuilder
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
+          SearchRequest searchRequest,
+          QueryBuilder queryBuilder) throws InvalidSearchException {
+
+    LOG.debug("Got search request; request={}", 
ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(queryBuilder)
             .trackScores(true);
-    searchRequest.getSort().forEach(sortField -> 
searchSourceBuilder.sort(sortField.getField(), 
getElasticsearchSortOrder(sortField.getSortOrder())));
-    Optional<List<String>> fields = searchRequest.getFields();
-    if (fields.isPresent()) {
-      searchSourceBuilder.fields(fields.get());
-    } else {
-      searchSourceBuilder.fetchSource(true);
+
+    // column metadata needed to understand the type of each sort field
+    Map<String, FieldType> meta;
+    try {
+      meta = getColumnMetadata(searchRequest.getIndices());
+    } catch(IOException e) {
+      throw new InvalidSearchException("Unable to get column metadata", e);
     }
-    Optional<List<String>> facetFields = searchRequest.getFacetFields();
-    if (facetFields.isPresent()) {
-      facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new 
TermsBuilder(getFacentAggregationName(field)).field(field)));
+
+    // handle sort fields
+    for(SortField sortField : searchRequest.getSort()) {
+
+      // what type is the sort field?
+      FieldType sortFieldType = meta.getOrDefault(sortField.getField(), 
FieldType.OTHER);
+
+      // sort order - if ascending missing values sorted last. otherwise, 
missing values sorted first
+      org.elasticsearch.search.sort.SortOrder sortOrder = 
getElasticsearchSortOrder(sortField.getSortOrder());
+      String missingSortOrder;
+      if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) {
+        missingSortOrder = SORT_MISSING_LAST;
+      } else {
+        missingSortOrder = SORT_MISSING_FIRST;
+      }
+
+      // sort by the field - missing fields always last
+      FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField())
+              .order(sortOrder)
+              .missing(missingSortOrder)
+              .unmappedType(sortFieldType.getFieldType());
+      searchBuilder.sort(sortBy);
     }
-    String[] wildcardIndices = wildcardIndices(searchRequest.getIndices());
-    org.elasticsearch.action.search.SearchResponse elasticsearchResponse;
-    try {
-      elasticsearchResponse = client.search(new 
org.elasticsearch.action.search.SearchRequest(wildcardIndices)
-              .source(searchSourceBuilder)).actionGet();
-    } catch (SearchPhaseExecutionException e) {
-      LOG.error("Could not execute search", e);
-      throw new InvalidSearchException("Could not execute search", e);
+
+    // handle search fields
+    if (searchRequest.getFields().isPresent()) {
+      searchBuilder.fields(searchRequest.getFields().get());
+    } else {
+      searchBuilder.fetchSource(true);
     }
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      for(String field : searchRequest.getFacetFields().get()) {
+        String name = getFacentAggregationName(field);
+        TermsBuilder terms = new TermsBuilder(name).field(field);
+        searchBuilder.aggregation(terms);
+      }
+    }
+
+    // return the search request
+    String[] indices = wildcardIndices(searchRequest.getIndices());
+    LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, 
searchBuilder.toString());
+    return new org.elasticsearch.action.search.SearchRequest()
+            .indices(indices)
+            .source(searchBuilder);
+  }
+
+  /**
+   * Builds a search response.
+   *
+   * This effectively transforms an Elasticsearch search response into a 
Metron search response.
+   *
+   * @param searchRequest The Metron search request.
+   * @param esResponse The Elasticsearch search response.
+   * @return A Metron search response.
+   * @throws InvalidSearchException
+   */
+  private SearchResponse buildSearchResponse(
+          SearchRequest searchRequest,
+          org.elasticsearch.action.search.SearchResponse esResponse) throws 
InvalidSearchException {
+
     SearchResponse searchResponse = new SearchResponse();
-    searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits());
-    
searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit
 ->
-        getSearchResult(searchHit, 
fields.isPresent())).collect(Collectors.toList()));
-    if (facetFields.isPresent()) {
+    searchResponse.setTotal(esResponse.getHits().getTotalHits());
+
+    // search hits --> search results
+    List<SearchResult> results = new ArrayList<>();
+    for(SearchHit hit: esResponse.getHits().getHits()) {
+      results.add(getSearchResult(hit, searchRequest.getFields().isPresent()));
+    }
+    searchResponse.setResults(results);
+
+    // handle facet fields
+    if (searchRequest.getFacetFields().isPresent()) {
+      List<String> facetFields = searchRequest.getFacetFields().get();
       Map<String, FieldType> commonColumnMetadata;
       try {
         commonColumnMetadata = getColumnMetadata(searchRequest.getIndices());
       } catch (IOException e) {
-        throw new InvalidSearchException(String.format("Could not get common 
column metadata for indices %s", 
Arrays.toString(searchRequest.getIndices().toArray())));
+        throw new InvalidSearchException(String.format(
+                "Could not get common column metadata for indices %s",
+                Arrays.toString(searchRequest.getIndices().toArray())));
       }
-      searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), 
elasticsearchResponse.getAggregations(), commonColumnMetadata ));
+      searchResponse.setFacetCounts(getFacetCounts(facetFields, 
esResponse.getAggregations(), commonColumnMetadata ));
     }
+
+    LOG.debug("Built search response; response={}", 
ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
     return searchResponse;
   }
 
@@ -188,42 +281,76 @@ public class ElasticsearchDao implements IndexDao {
    */
   protected GroupResponse group(GroupRequest groupRequest, QueryBuilder 
queryBuilder)
       throws InvalidSearchException {
+    org.elasticsearch.action.search.SearchRequest esRequest;
+    org.elasticsearch.action.search.SearchResponse esResponse;
+
     if (client == null) {
       throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
     }
     if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 
0) {
       throw new InvalidSearchException("At least 1 group must be provided.");
     }
-    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-    searchSourceBuilder.query(queryBuilder);
-    searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0));
-    String[] wildcardIndices = wildcardIndices(groupRequest.getIndices());
-    org.elasticsearch.action.search.SearchRequest request;
-    org.elasticsearch.action.search.SearchResponse response;
 
-    try {
-      request = new 
org.elasticsearch.action.search.SearchRequest(wildcardIndices)
-          .source(searchSourceBuilder);
-      response = client.search(request).actionGet();
-    } catch (SearchPhaseExecutionException e) {
-      throw new InvalidSearchException("Could not execute search", e);
-    }
+    esRequest = buildGroupRequest(groupRequest, queryBuilder);
+    esResponse = requestSubmitter.submitSearch(esRequest);
+    GroupResponse response = buildGroupResponse(groupRequest, esResponse);
+
+    return response;
+  }
+
+  /**
+   * Builds a group search request.
+   * @param groupRequest The Metron group request.
+   * @param queryBuilder The search query.
+   * @return An Elasticsearch search request.
+   */
+  private org.elasticsearch.action.search.SearchRequest buildGroupRequest(
+          GroupRequest groupRequest,
+          QueryBuilder queryBuilder) {
+
+    // handle groups
+    TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+    final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
+            .query(queryBuilder)
+            .aggregation(groups);
+
+    // return the search request
+    String[] indices = wildcardIndices(groupRequest.getIndices());
+    return new org.elasticsearch.action.search.SearchRequest()
+            .indices(indices)
+            .source(searchSourceBuilder);
+  }
+
+  /**
+   * Build a group response.
+   * @param groupRequest The original group request.
+   * @param response The search response.
+   * @return A group response.
+   * @throws InvalidSearchException
+   */
+  private GroupResponse buildGroupResponse(
+          GroupRequest groupRequest,
+          org.elasticsearch.action.search.SearchResponse response) throws 
InvalidSearchException {
+
+    // build the search response
     Map<String, FieldType> commonColumnMetadata;
     try {
       commonColumnMetadata = getColumnMetadata(groupRequest.getIndices());
     } catch (IOException e) {
-      throw new InvalidSearchException(String
-          .format("Could not get common column metadata for indices %s",
+      throw new InvalidSearchException(String.format("Could not get common 
column metadata for indices %s",
               Arrays.toString(groupRequest.getIndices().toArray())));
     }
+
     GroupResponse groupResponse = new GroupResponse();
     groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
-    groupResponse.setGroupResults(
-        getGroupResults(groupRequest, 0, response.getAggregations(), 
commonColumnMetadata));
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, 
response.getAggregations(), commonColumnMetadata));
     return groupResponse;
   }
 
   private String[] wildcardIndices(List<String> indices) {
+    if(indices == null)
+      return new String[] {};
+
     return indices
             .stream()
             .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER))
@@ -235,26 +362,43 @@ public class ElasticsearchDao implements IndexDao {
     if(this.client == null) {
       this.client = 
ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), 
config.getOptionalSettings());
       this.accessConfig = config;
+      this.columnMetadataDao = new 
ElasticsearchColumnMetadataDao(this.client.admin());
+      this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
+    }
+
+    if(columnMetadataDao == null) {
+      throw new IllegalArgumentException("No ColumnMetadataDao available");
+    }
+
+    if(requestSubmitter == null) {
+      throw new IllegalArgumentException("No ElasticsearchRequestSubmitter 
available");
     }
   }
 
   @Override
   public Document getLatest(final String guid, final String sensorType) throws 
IOException {
-    Optional<Document> ret = searchByGuid(
-            guid
-            , sensorType
-            , hit -> {
-              Long ts = 0L;
-              String doc = hit.getSourceAsString();
-              String sourceType = 
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
-              try {
-                return Optional.of(new Document(doc, guid, sourceType, ts));
-              } catch (IOException e) {
-                throw new IllegalStateException("Unable to retrieve latest: " 
+ e.getMessage(), e);
-              }
-            }
-            );
-    return ret.orElse(null);
+    Optional<Document> doc = searchByGuid(guid, sensorType, hit -> 
toDocument(guid, hit));
+    return doc.orElse(null);
+  }
+
+  private Optional<Document> toDocument(final String guid, SearchHit hit) {
+    Long ts = 0L;
+    String doc = hit.getSourceAsString();
+    String sourceType = toSourceType(hit.getType());
+    try {
+      return Optional.of(new Document(doc, guid, sourceType, ts));
+    } catch (IOException e) {
+      throw new IllegalStateException("Unable to retrieve latest: " + 
e.getMessage(), e);
+    }
+  }
+
+  /**
+   * Returns the source type based on a given doc type.
+   * @param docType The document type.
+   * @return The source type.
+   */
+  private String toSourceType(String docType) {
+    return Iterables.getFirst(Splitter.on("_doc").split(docType), null);
   }
 
   @Override
@@ -394,8 +538,7 @@ public class ElasticsearchDao implements IndexDao {
     String type = sensorType + "_doc";
     Object ts = update.getTimestamp();
     IndexRequest indexRequest = new IndexRequest(indexName, type, 
update.getGuid())
-        .source(update.getDocument())
-        ;
+        .source(update.getDocument());
     if(ts != null) {
       indexRequest = indexRequest.timestamp(ts.toString());
     }
@@ -403,77 +546,9 @@ public class ElasticsearchDao implements IndexDao {
     return indexRequest;
   }
 
-  @SuppressWarnings("unchecked")
   @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException {
-    Map<String, FieldType> indexColumnMetadata = new HashMap<>();
-
-    // Keep track of the last index used to inspect a field type so we can 
print a helpful error message on type mismatch
-    Map<String, String> previousIndices = new HashMap<>();
-    // If we have detected a field type mismatch, ignore the field going 
forward since the type has been set to OTHER
-    Set<String> fieldBlackList = new HashSet<>();
-
-    String[] latestIndices = getLatestIndices(indices);
-    if (latestIndices.length > 0) {
-      ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings = client
-          .admin()
-          .indices()
-          .getMappings(new GetMappingsRequest().indices(latestIndices))
-          .actionGet()
-          .getMappings();
-      for (Object key : mappings.keys().toArray()) {
-        String indexName = key.toString();
-        ImmutableOpenMap<String, MappingMetaData> mapping = 
mappings.get(indexName);
-        Iterator<String> mappingIterator = mapping.keysIt();
-        while (mappingIterator.hasNext()) {
-          MappingMetaData mappingMetaData = 
mapping.get(mappingIterator.next());
-          Map<String, Map<String, String>> map = (Map<String, Map<String, 
String>>) mappingMetaData
-              .getSourceAsMap().get("properties");
-          for (String field : map.keySet()) {
-            if (!fieldBlackList.contains(field)) {
-              FieldType type = elasticsearchSearchTypeMap
-                  .getOrDefault(map.get(field).get("type"), FieldType.OTHER);
-              if (indexColumnMetadata.containsKey(field)) {
-                FieldType previousType = indexColumnMetadata.get(field);
-                if (!type.equals(previousType)) {
-                  String previousIndexName = previousIndices.get(field);
-                  LOG.error(String.format(
-                      "Field type mismatch: %s.%s has type %s while %s.%s has 
type %s.  Defaulting type to %s.",
-                      indexName, field, type.getFieldType(),
-                      previousIndexName, field, previousType.getFieldType(),
-                      FieldType.OTHER.getFieldType()));
-                  indexColumnMetadata.put(field, FieldType.OTHER);
-                  // Detected a type mismatch so ignore the field from now on
-                  fieldBlackList.add(field);
-                }
-              } else {
-                indexColumnMetadata.put(field, type);
-                previousIndices.put(field, indexName);
-              }
-            }
-          }
-        }
-      }
-    }
-    return indexColumnMetadata;
-  }
-
-  protected String[] getLatestIndices(List<String> includeIndices) {
-    Map<String, String> latestIndices = new HashMap<>();
-    String[] indices = 
client.admin().indices().prepareGetIndex().setFeatures().get().getIndices();
-    for (String index : indices) {
-      int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER);
-      if (prefixEnd != -1) {
-        String prefix = index.substring(0, prefixEnd);
-        if (includeIndices.contains(prefix)) {
-          String latestIndex = latestIndices.get(prefix);
-          if (latestIndex == null || index.compareTo(latestIndex) > 0) {
-            latestIndices.put(prefix, index);
-          }
-        }
-      }
-    }
-    return latestIndices.values().toArray(new String[latestIndices.size()]);
+    return columnMetadataDao.getColumnMetadata(indices);
   }
 
   private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder(
@@ -588,4 +663,19 @@ public class ElasticsearchDao implements IndexDao {
   private String getSumAggregationName(String field) {
     return String.format("%s_score", field);
   }
+
+  public ElasticsearchDao client(TransportClient client) {
+    this.client = client;
+    return this;
+  }
+
+  public ElasticsearchDao columnMetadataDao(ColumnMetadataDao 
columnMetadataDao) {
+    this.columnMetadataDao = columnMetadataDao;
+    return this;
+  }
+
+  public ElasticsearchDao accessConfig(AccessConfig accessConfig) {
+    this.accessConfig = accessConfig;
+    return this;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
new file mode 100644
index 0000000..0e0df21
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.search.SearchPhaseExecutionException;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+
+/**
+ * Responsible for submitting requests to Elasticsearch.
+ */
+public class ElasticsearchRequestSubmitter {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  /**
+   * The Elasticsearch client.
+   */
+  private TransportClient client;
+
+  public ElasticsearchRequestSubmitter(TransportClient client) {
+    this.client = client;
+  }
+
+  /**
+   * Submit a search to Elasticsearch.
+   * @param request A search request.
+   * @return The search response.
+   */
+  public SearchResponse submitSearch(SearchRequest request) throws 
InvalidSearchException {
+    LOG.debug("About to submit a search; request={}", 
ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // submit the search request
+    org.elasticsearch.action.search.SearchResponse esResponse;
+    try {
+      esResponse = client
+              .search(request)
+              .actionGet();
+      LOG.debug("Got Elasticsearch response; response={}", 
esResponse.toString());
+
+    } catch (SearchPhaseExecutionException e) {
+      String msg = String.format(
+              "Failed to execute search; error='%s', search='%s'",
+              ExceptionUtils.getRootCauseMessage(e),
+              ElasticsearchUtils.toJSON(request).orElse("???"));
+      LOG.error(msg, e);
+      throw new InvalidSearchException(msg, e);
+    }
+
+    // check for shard failures
+    if(esResponse.getFailedShards() > 0) {
+      handleShardFailures(request, esResponse);
+    }
+
+    // validate the response status
+    if(RestStatus.OK == esResponse.status()) {
+      return esResponse;
+
+    } else {
+      // the search was not successful
+      String msg = String.format(
+              "Bad search response; status=%s, timeout=%s, terminatedEarly=%s",
+              esResponse.status(), esResponse.isTimedOut(), 
esResponse.isTerminatedEarly());
+      LOG.error(msg);
+      throw new InvalidSearchException(msg);
+    }
+  }
+
+  /**
+   * Handle individual shard failures that can occur even when the response is 
OK.  These
+   * can indicate misconfiguration of the search indices.
+   * @param request The search request.
+   * @param response  The search response.
+   */
+  private void handleShardFailures(
+          org.elasticsearch.action.search.SearchRequest request,
+          org.elasticsearch.action.search.SearchResponse response) {
+    /*
+     * shard failures are only logged.  the search itself is not failed.  this 
approach
+     * assumes that a user is interested in partial search results, even if the
+     * entire search result set cannot be produced.
+     *
+     * for example, assume the user adds an additional sensor and the telemetry
+     * is indexed into a new search index.  if that search index is 
misconfigured,
+     * it can result in partial shard failures.  rather than failing the 
entire search,
+     * we log the error and allow the results to be returned from shards that
+     * are correctly configured.
+     */
+    int errors = ArrayUtils.getLength(response.getShardFailures());
+    LOG.error("Search resulted in {}/{} shards failing; errors={}, search={}",
+            response.getFailedShards(),
+            response.getTotalShards(),
+            errors,
+            ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // log each reported failure
+    int failureCount=1;
+    for(ShardSearchFailure fail: response.getShardFailures()) {
+      String msg = String.format(
+              "Shard search failure [%s/%s]; reason=%s, index=%s, shard=%s, 
status=%s, nodeId=%s",
+              failureCount,
+              errors,
+              ExceptionUtils.getRootCauseMessage(fail.getCause()),
+              fail.index(),
+              fail.shardId(),
+              fail.status(),
+              fail.shard().getNodeId());
+      LOG.error(msg, fail.getCause());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 4c9933b..f29012a 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -22,10 +22,15 @@ import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
@@ -35,11 +40,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static java.lang.String.format;
 
 public class ElasticsearchUtils {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
 
@@ -179,4 +187,48 @@ public class ElasticsearchUtils {
     }
     throw new IllegalStateException("Unable to read the elasticsearch ips, 
expected es.ip to be either a list of strings, a string hostname or a host:port 
string");
   }
+
+  /**
+   * Converts an Elasticsearch SearchRequest to JSON.
+   * @param esRequest The search request.
+   * @return The JSON representation of the SearchRequest.
+   */
+  public static Optional<String> 
toJSON(org.elasticsearch.action.search.SearchRequest esRequest) {
+    Optional<String> json = Optional.empty();
+
+    if(esRequest != null) {
+      try {
+        json = Optional.of(XContentHelper.convertToJson(esRequest.source(), 
true));
+
+      } catch (Throwable t) {
+        LOG.error("Failed to convert search request to JSON", t);
+      }
+    }
+
+    return json;
+  }
+
+  /**
+   * Convert a SearchRequest to JSON.
+   * @param request The search request.
+   * @return The JSON representation of the SearchRequest.
+   */
+  public static Optional<String> toJSON(Object request) {
+    Optional<String> json = Optional.empty();
+
+    if(request != null) {
+      try {
+        json = Optional.of(
+                new ObjectMapper()
+                        .writer()
+                        .withDefaultPrettyPrinter()
+                        .writeValueAsString(request));
+
+      } catch (Throwable t) {
+        LOG.error("Failed to convert request to JSON", t);
+      }
+    }
+
+    return json;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
new file mode 100644
index 0000000..0a83ee0
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.IndicesAdminClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests the ElasticsearchColumnMetadata class.
+ */
+public class ElasticsearchColumnMetadataDaoTest {
+
+  /**
+   * @param indices The names of all indices that will exist.
+   * @return An object to test.
+   */
+  public ElasticsearchColumnMetadataDao setup(String[] indices) {
+    return setup(indices, ImmutableOpenMap.of());
+  }
+
+  /**
+   * @param indices The names of all indices that will exist.
+   * @param mappings The index mappings.
+   * @return An object to test.
+   */
+  public ElasticsearchColumnMetadataDao setup(
+          String[] indices,
+          ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings) {
+
+    AdminClient adminClient = mock(AdminClient.class);
+    IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
+    GetIndexRequestBuilder getIndexRequestBuilder = 
mock(GetIndexRequestBuilder.class);
+    GetIndexResponse getIndexResponse = mock(GetIndexResponse.class);
+    ActionFuture getMappingsActionFuture = mock(ActionFuture.class);
+    GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class);
+
+    // setup the mocks so that a set of indices are available to the DAO
+    when(adminClient.indices()).thenReturn(indicesAdminClient);
+    
when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder);
+    
when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder);
+    when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse);
+    when(getIndexResponse.getIndices()).thenReturn(indices);
+
+    // setup the mocks so that a set of mappings are available to the DAO
+    
when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture);
+    when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse);
+    when(getMappingsResponse.getMappings()).thenReturn(mappings);
+
+    return new ElasticsearchColumnMetadataDao(adminClient);
+  }
+
+  @Test
+  public void testGetOneLatestIndex() {
+
+    // setup
+    String[] existingIndices = new String[] {
+            "bro_index_2017.10.03.19",
+            "bro_index_2017.10.03.20",
+            "bro_index_2017.10.03.21",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.20",
+            "snort_index_2017.10.03.21"
+    };
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Collections.singletonList("bro");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] { "bro_index_2017.10.03.21" };
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetLatestIndices() {
+    // setup
+    String[] existingIndices = new String[] {
+            "bro_index_2017.10.03.19",
+            "bro_index_2017.10.03.20",
+            "bro_index_2017.10.03.21",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.19",
+            "snort_index_2017.10.03.21"
+    };
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Arrays.asList("bro", "snort");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] { "bro_index_2017.10.03.21", 
"snort_index_2017.10.03.21" };
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void testLatestIndicesWhereNoneExist() {
+
+    // setup - there are no existing indices
+    String[] existingIndices = new String[] {};
+    ElasticsearchColumnMetadataDao dao = setup(existingIndices);
+
+    // get the latest indices
+    List<String> args = Arrays.asList("bro", "snort");
+    String[] actual = dao.getLatestIndices(args);
+
+    // validation
+    String [] expected = new String[] {};
+    assertArrayEquals(expected, actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index 7c33018..a6c0aa6 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -17,98 +17,209 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.matcher.SearchRequestMatcher;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.search.*;
-import org.elasticsearch.action.ActionFuture;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.junit.Assert;
-import org.junit.Before;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
 import org.junit.Test;
-import org.mockito.Mock;
+import org.mockito.ArgumentCaptor;
 
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class ElasticsearchDaoTest {
 
-  private IndexDao searchService;
+  private ElasticsearchDao dao;
+  private ElasticsearchRequestSubmitter requestSubmitter;
 
-  @Mock
-  TransportClient client;
+  private void setup(RestStatus status, int maxSearchResults, Map<String, 
FieldType> metadata) throws Exception {
 
-  @Before
-  public void setUp() throws Exception {
-    client = mock(TransportClient.class);
+    // setup the mock search hits
+    SearchHit hit1 = mock(SearchHit.class);
+    when(hit1.getId()).thenReturn("id1");
+    when(hit1.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value1"); }});
+    when(hit1.getScore()).thenReturn(0.1f);
+
+    SearchHit hit2 = mock(SearchHit.class);
+    when(hit2.getId()).thenReturn("id2");
+    when(hit2.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value2"); }});
+    when(hit2.getScore()).thenReturn(0.2f);
+
+    // search hits
+    SearchHit[] hits = { hit1, hit2 };
+    SearchHits searchHits = mock(SearchHits.class);
+    when(searchHits.getHits()).thenReturn(hits);
+    
when(searchHits.getTotalHits()).thenReturn(Integer.toUnsignedLong(hits.length));
+
+    // search response which returns the search hits
+    org.elasticsearch.action.search.SearchResponse response = 
mock(org.elasticsearch.action.search.SearchResponse.class);
+    when(response.status()).thenReturn(status);
+    when(response.getHits()).thenReturn(searchHits);
+
+    // provides column metadata
+    ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class);
+    when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata);
+
+    // returns the search response
+    requestSubmitter = mock(ElasticsearchRequestSubmitter.class);
+    when(requestSubmitter.submitSearch(any())).thenReturn(response);
+
+    TransportClient client = mock(TransportClient.class);
+
+    // provides configuration
     AccessConfig config = mock(AccessConfig.class);
-    when(config.getMaxSearchResults()).thenReturn(50);
-    searchService = new ElasticsearchDao(client, config);
+    when(config.getMaxSearchResults()).thenReturn(maxSearchResults);
+
+    dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, 
config);
+  }
+
+  private void setup(RestStatus status, int maxSearchResults) throws Exception 
{
+    setup(status, maxSearchResults, new HashMap<>());
   }
 
   @Test
-  public void searchShouldProperlyBuildSearchRequest() throws Exception {
+  public void searchShouldSortByGivenFields() throws Exception {
 
-    // setup the mock client
-    SearchHit searchHit1 = mock(SearchHit.class);
-    when(searchHit1.getId()).thenReturn("id1");
-    when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value1"); }});
-    when(searchHit1.getScore()).thenReturn(0.1f);
+    // setup the column metadata
+    Map<String, FieldType> columnMetadata = new HashMap<>();
+    columnMetadata.put("sortByStringDesc", FieldType.STRING);
+    columnMetadata.put("sortByIntAsc", FieldType.INTEGER);
 
-    SearchHit searchHit2 = mock(SearchHit.class);
-    when(searchHit2.getId()).thenReturn("id2");
-    when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ 
put("field", "value2"); }});
-    when(searchHit2.getScore()).thenReturn(0.2f);
+    // setup the dao
+    setup(RestStatus.OK, 25, columnMetadata);
 
-    SearchHits searchHits = mock(SearchHits.class);
-    when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, 
searchHit2});
-    when(searchHits.getTotalHits()).thenReturn(2L);
+    // "sort by" fields for the search request
+    SortField[] expectedSortFields = {
+            sortBy("sortByStringDesc", SortOrder.DESC),
+            sortBy("sortByIntAsc", SortOrder.ASC),
+            sortBy("sortByUndefinedDesc", SortOrder.DESC)
+    };
 
-    org.elasticsearch.action.search.SearchResponse elasticsearchResponse = 
mock(org.elasticsearch.action.search.SearchResponse.class);
-    when(elasticsearchResponse.getHits()).thenReturn(searchHits);
+    // create a metron search request
+    final List<String> indices = Arrays.asList("bro", "snort");
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(2);
+    searchRequest.setIndices(indices);
+    searchRequest.setFrom(5);
+    searchRequest.setSort(Arrays.asList(expectedSortFields));
+    searchRequest.setQuery("some query");
 
-    ActionFuture actionFuture = mock(ActionFuture.class);
-    when(actionFuture.actionGet()).thenReturn(elasticsearchResponse);
-    when(client.search(any())).thenReturn(actionFuture);
+    // submit the metron search request
+    SearchResponse searchResponse = dao.search(searchRequest);
+    assertNotNull(searchResponse);
+
+    // capture the elasticsearch search request that was created
+    ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = 
ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+    verify(requestSubmitter).submitSearch(argument.capture());
+    org.elasticsearch.action.search.SearchRequest request = 
argument.getValue();
+
+    // transform the request to JSON for validation
+    JSONParser parser = new JSONParser();
+    JSONObject json = (JSONObject) 
parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // validate the sort fields
+    JSONArray sortFields = (JSONArray) json.get("sort");
+    assertEquals(3, sortFields.size());
+
+    {
+      // sort by string descending
+      JSONObject aSortField = (JSONObject) sortFields.get(0);
+      JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc");
+      assertEquals("desc", sortBy.get("order"));
+      assertEquals("_last", sortBy.get("missing"));
+      assertEquals("string", sortBy.get("unmapped_type"));
+    }
+    {
+      // sort by integer ascending
+      JSONObject aSortField = (JSONObject) sortFields.get(1);
+      JSONObject sortByIntAsc = (JSONObject) aSortField.get("sortByIntAsc");
+      assertEquals("asc", sortByIntAsc.get("order"));
+      assertEquals("_first", sortByIntAsc.get("missing"));
+      assertEquals("integer", sortByIntAsc.get("unmapped_type"));
+    }
+    {
+      // sort by unknown type
+      JSONObject aSortField = (JSONObject) sortFields.get(2);
+      JSONObject sortByUndefinedDesc = (JSONObject) 
aSortField.get("sortByUndefinedDesc");
+      assertEquals("desc", sortByUndefinedDesc.get("order"));
+      assertEquals("_last", sortByUndefinedDesc.get("missing"));
+      assertEquals("other", sortByUndefinedDesc.get("unmapped_type"));
+    }
+  }
+
+  @Test
+  public void searchShouldWildcardIndices() throws Exception {
+
+    // setup the dao
+    setup(RestStatus.OK, 25);
 
     // "sort by" fields for the search request
-    SortField[] sortFields = {
-            sortBy("sortField1", SortOrder.DESC),
-            sortBy("sortField2", SortOrder.ASC)
+    SortField[] expectedSortFields = {
+            sortBy("sortByStringDesc", SortOrder.DESC),
+            sortBy("sortByIntAsc", SortOrder.ASC),
+            sortBy("sortByUndefinedDesc", SortOrder.DESC)
     };
 
-    // create a search request
+    // create a metron search request
+    final List<String> indices = Arrays.asList("bro", "snort");
     SearchRequest searchRequest = new SearchRequest();
     searchRequest.setSize(2);
-    searchRequest.setIndices(Arrays.asList("bro", "snort"));
+    searchRequest.setIndices(indices);
     searchRequest.setFrom(5);
-    searchRequest.setSort(Arrays.asList(sortFields));
+    searchRequest.setSort(Arrays.asList(expectedSortFields));
     searchRequest.setQuery("some query");
 
-    // submit the search request
-    SearchResponse searchResponse = searchService.search(searchRequest);
-
-    // validate
-    String[] expectedIndices = {"bro_index*", "snort_index*"};
-    verify(client).search(argThat(new SearchRequestMatcher(expectedIndices, 
"some query", 2, 5, sortFields)));
-    assertEquals(2, searchResponse.getTotal());
-    List<SearchResult> actualSearchResults = searchResponse.getResults();
-    assertEquals(2, actualSearchResults.size());
-    assertEquals("id1", actualSearchResults.get(0).getId());
-    assertEquals("value1", 
actualSearchResults.get(0).getSource().get("field"));
-    assertEquals(0.1f, actualSearchResults.get(0).getScore(), 0.0f);
-    assertEquals("id2", actualSearchResults.get(1).getId());
-    assertEquals("value2", 
actualSearchResults.get(1).getSource().get("field"));
-    assertEquals(0.2f, actualSearchResults.get(1).getScore(), 0.0f);
-    verifyNoMoreInteractions(client);
+    // submit the metron search request
+    SearchResponse searchResponse = dao.search(searchRequest);
+    assertNotNull(searchResponse);
+
+    // capture the elasticsearch search request that was created
+    ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = 
ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class);
+    verify(requestSubmitter).submitSearch(argument.capture());
+    org.elasticsearch.action.search.SearchRequest request = 
argument.getValue();
+
+    // transform the request to JSON for validation
+    JSONParser parser = new JSONParser();
+    JSONObject json = (JSONObject) 
parser.parse(ElasticsearchUtils.toJSON(request).orElse("???"));
+
+    // ensure that the index names are 'wildcard-ed'
+    String[] expected = { "bro_index*", "snort_index*" };
+    assertArrayEquals(expected, request.indices());
+  }
+
+
+  @Test(expected = InvalidSearchException.class)
+  public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws 
Exception {
+
+    int maxSearchResults = 20;
+    setup(RestStatus.OK, maxSearchResults);
+
+    SearchRequest searchRequest = new SearchRequest();
+    searchRequest.setSize(maxSearchResults+1);
+
+    dao.search(searchRequest);
+    // exception expected - size > max
   }
 
   private SortField sortBy(String field, SortOrder order) {
@@ -118,19 +229,4 @@ public class ElasticsearchDaoTest {
     return sortField;
   }
 
-  @Test
-  public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws 
Exception {
-    SearchRequest searchRequest = new SearchRequest();
-    searchRequest.setSize(51);
-    try {
-      searchService.search(searchRequest);
-      Assert.fail("Did not throw expected exception");
-    }
-    catch(InvalidSearchException ise) {
-      Assert.assertEquals("Search result size must be less than 50", 
ise.getMessage());
-    }
-  }
-
-
-
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
new file mode 100644
index 0000000..26f5fff
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.dao;
+
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.ShardSearchFailure;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.rest.RestStatus;
+import org.elasticsearch.search.SearchShardTarget;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ElasticsearchRequestSubmitterTest {
+
+  private ElasticsearchRequestSubmitter submitter;
+
+  public ElasticsearchRequestSubmitter setup(SearchResponse response) {
+
+    // mocks
+    TransportClient client = mock(TransportClient.class);
+    ActionFuture future = Mockito.mock(ActionFuture.class);
+
+    // the client should return the given search response
+    when(client.search(any())).thenReturn(future);
+    when(future.actionGet()).thenReturn(response);
+
+    return new ElasticsearchRequestSubmitter(client);
+  }
+
+  @Test
+  public void searchShouldSucceedWhenOK() throws InvalidSearchException {
+
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+
+    // response will have status of OK and no failed shards
+    when(response.status()).thenReturn(RestStatus.OK);
+    when(response.getFailedShards()).thenReturn(0);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // search should succeed
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    SearchResponse actual = submitter.submitSearch(request);
+    assertNotNull(actual);
+  }
+
+  @Test(expected = InvalidSearchException.class)
+  public void searchShouldFailWhenNotOK() throws InvalidSearchException {
+
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+
+    // response will have status of OK
+    when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT);
+    when(response.getFailedShards()).thenReturn(0);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // search should succeed
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    submitter.submitSearch(request);
+  }
+
+  @Test
+  public void searchShouldHandleShardFailure() throws InvalidSearchException {
+    // mocks
+    SearchResponse response = mock(SearchResponse.class);
+    SearchRequest request = mock(SearchRequest.class);
+    ShardSearchFailure fail = mock(ShardSearchFailure.class);
+    SearchShardTarget target = mock(SearchShardTarget.class);
+
+    // response will have status of OK
+    when(response.status()).thenReturn(RestStatus.OK);
+
+    // the response will report shard failures
+    when(response.getFailedShards()).thenReturn(1);
+    when(response.getTotalShards()).thenReturn(2);
+
+    // the response will return the failures
+    ShardSearchFailure[] failures = { fail };
+    when(response.getShardFailures()).thenReturn(failures);
+
+    // shard failure needs to report the node
+    when(fail.shard()).thenReturn(target);
+    when(target.getNodeId()).thenReturn("node1");
+
+    // shard failure needs to report details of failure
+    when(fail.index()).thenReturn("bro_index_2017-10-11");
+    when(fail.shardId()).thenReturn(1);
+
+    // search should succeed, even with failed shards
+    ElasticsearchRequestSubmitter submitter = setup(response);
+    SearchResponse actual = submitter.submitSearch(request);
+    assertNotNull(actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index 07cc708..3d50e99 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -17,18 +17,11 @@
  */
 package org.apache.metron.elasticsearch.integration;
 
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.concurrent.ExecutionException;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.MetaAlertDao;
 import org.apache.metron.indexing.dao.SearchIntegrationTest;
 import org.apache.metron.integration.InMemoryComponent;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
@@ -43,7 +36,13 @@ import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.concurrent.ExecutionException;
+
 public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
+
   private static String indexDir = "target/elasticsearch_search";
   private static String dateFormat = "yyyy.MM.dd.HH";
   private static final int MAX_RETRIES = 10;
@@ -53,19 +52,46 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
    * {
    * "bro_doc": {
    *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "score": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "bro_field": { "type": "string" },
-   *     "duplicate_name_field": { "type": "string" }
+   *     "source:type": {
+   *        "type": "string",
+   *        "index": "not_analyzed"
+   *     },
+   *     "ip_src_addr": {
+   *        "type": "ip"
+   *     },
+   *     "ip_src_port": {
+   *        "type": "integer"
+   *     },
+   *     "long_field": {
+   *        "type": "long"
+   *     },
+   *     "timestamp": {
+   *        "type": "date",
+   *        "format": "epoch_millis"
+   *      },
+   *     "latitude" : {
+   *        "type": "float"
+   *      },
+   *     "score": {
+   *        "type": "double"
+   *     },
+   *     "is_alert": {
+   *        "type": "boolean"
+   *     },
+   *     "location_point": {
+   *        "type": "geo_point"
+   *     },
+   *     "bro_field": {
+   *        "type": "string"
+   *     },
+   *     "duplicate_name_field": {
+   *        "type": "string"
+   *     },
+   *     "alert": {
+   *         "type": "nested"
+   *     }
    *   }
-   * }
+   *  }
    * }
    */
   @Multiline
@@ -73,21 +99,51 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
 
   /**
    * {
-   * "snort_doc": {
-   *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "score": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "snort_field": { "type": "integer" },
-   *     "duplicate_name_field": { "type": "integer" }
-   *   }
-   * }
+   *  "snort_doc": {
+   *     "properties": {
+   *        "source:type": {
+   *          "type": "string",
+   *          "index": "not_analyzed"
+   *        },
+   *        "ip_src_addr": {
+   *          "type": "ip"
+   *        },
+   *        "ip_src_port": {
+   *          "type": "integer"
+   *        },
+   *        "long_field": {
+   *          "type": "long"
+   *        },
+   *        "timestamp": {
+   *          "type": "date",
+   *          "format": "epoch_millis"
+   *        },
+   *        "latitude" : {
+   *          "type": "float"
+   *        },
+   *        "score": {
+   *          "type": "double"
+   *        },
+   *        "is_alert": {
+   *          "type": "boolean"
+   *        },
+   *        "location_point": {
+   *          "type": "geo_point"
+   *        },
+   *        "snort_field": {
+   *          "type": "integer"
+   *        },
+   *        "duplicate_name_field": {
+   *          "type": "integer"
+   *        },
+   *        "alert": {
+   *           "type": "nested"
+   *        },
+   *        "threat:triage:score": {
+   *           "type": "float"
+   *        }
+   *      }
+   *    }
    * }
    */
   @Multiline
@@ -106,27 +162,23 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
   @Multiline
   private static String metaAlertTypeMappings;
 
-
   @Override
   protected IndexDao createDao() throws Exception {
-    IndexDao elasticsearchDao = new ElasticsearchDao();
-    elasticsearchDao.init(
-            new AccessConfig() {{
-              setMaxSearchResults(100);
-              setMaxSearchGroups(100);
-              setGlobalConfigSupplier( () ->
-                new HashMap<String, Object>() {{
-                  put("es.clustername", "metron");
-                  put("es.port", "9300");
-                  put("es.ip", "localhost");
-                  put("es.date.format", dateFormat);
-                  }}
-              );
+    AccessConfig config = new AccessConfig();
+    config.setMaxSearchResults(100);
+    config.setMaxSearchGroups(100);
+    config.setGlobalConfigSupplier( () ->
+            new HashMap<String, Object>() {{
+              put("es.clustername", "metron");
+              put("es.port", "9300");
+              put("es.ip", "localhost");
+              put("es.date.format", dateFormat);
             }}
     );
-    MetaAlertDao ret = new ElasticsearchMetaAlertDao();
-    ret.init(elasticsearchDao);
-    return elasticsearchDao;
+
+    IndexDao dao = new ElasticsearchDao();
+    dao.init(config);
+    return dao;
   }
 
   @Override

Reply via email to