Repository: metron
Updated Branches:
  refs/heads/master 14bcc0389 -> 19096fe62


METRON-1056 Get field types from Elasticsearch (merrimanr) closes 
apache/metron#662


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/19096fe6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/19096fe6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/19096fe6

Branch: refs/heads/master
Commit: 19096fe62c4728d326bfc87b2aa831c6aec3d5b2
Parents: 14bcc03
Author: merrimanr <[email protected]>
Authored: Tue Aug 1 15:14:31 2017 -0500
Committer: merrimanr <[email protected]>
Committed: Tue Aug 1 15:14:31 2017 -0500

----------------------------------------------------------------------
 metron-interface/metron-rest/README.md          |  16 +++
 .../rest/controller/SearchController.java       |  18 ++++
 .../metron/rest/service/SearchService.java      |   6 ++
 .../service/impl/IndexDaoSearchServiceImpl.java |  25 +++++
 .../SearchControllerIntegrationTest.java        |  76 ++++++++++++++
 .../elasticsearch/dao/ElasticsearchDao.java     |  94 ++++++++++++++++-
 .../ElasticsearchDaoIntegrationTest.java        |  50 +++++++++
 .../apache/metron/indexing/dao/IndexDao.java    |   5 +
 .../metron/indexing/dao/search/FieldType.java   |  52 ++++++++++
 .../apache/metron/indexing/dao/InMemoryDao.java |  32 ++++++
 .../dao/IndexingDaoIntegrationTest.java         | 101 +++++++++++++++++--
 11 files changed, 464 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/README.md
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/README.md 
b/metron-interface/metron-rest/README.md
index add8f27..46a3fc0 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -200,6 +200,8 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
 | [ `DELETE /api/v1/kafka/topic/{name}`](#delete-apiv1kafkatopicname)|
 | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)|
 | [ `GET /api/v1/search/search`](#get-apiv1searchsearch)|
+| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadata)|
+| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadatacommon)|
 | [ `GET /api/v1/sensor/enrichment/config`](#get-apiv1sensorenrichmentconfig)|
 | [ `GET 
/api/v1/sensor/enrichment/config/list/available/enrichments`](#get-apiv1sensorenrichmentconfiglistavailableenrichments)|
 | [ `GET 
/api/v1/sensor/enrichment/config/list/available/threat/triage/aggregators`](#get-apiv1sensorenrichmentconfiglistavailablethreattriageaggregators)|
@@ -354,6 +356,20 @@ Request and Response objects are JSON formatted.  The JSON 
schemas are available
       * searchRequest - Search request
   * Returns:
     * 200 - Search results
+    
+### `GET /api/v1/search/column/metadata`
+  * Description: Get column metadata for each index in the list of indicies
+  * Input:
+      * indices - Indices
+  * Returns:
+    * 200 - Column Metadata
+    
+### `GET /api/v1/search/column/metadata/common`
+  * Description: Get metadata for columns shared by the list of indices
+  * Input:
+      * indices - Indices
+  * Returns:
+    * 200 - Common Column Metadata
 
 ### `GET /api/v1/sensor/enrichment/config`
   * Description: Retrieves all SensorEnrichmentConfigs from Zookeeper

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
index 6915666..d3b805f 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java
@@ -20,6 +20,7 @@ package org.apache.metron.rest.controller;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
 import io.swagger.annotations.ApiResponse;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -32,6 +33,9 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.RestController;
 
+import java.util.List;
+import java.util.Map;
+
 @RestController
 @RequestMapping("/api/v1/search")
 public class SearchController {
@@ -45,4 +49,18 @@ public class SearchController {
   ResponseEntity<SearchResponse> search(final @ApiParam(name = 
"searchRequest", value = "Search request", required = true) @RequestBody 
SearchRequest searchRequest) throws RestException {
     return new ResponseEntity<>(searchService.search(searchRequest), 
HttpStatus.OK);
   }
+
+  @ApiOperation(value = "Get column metadata for each index in the list of 
indices")
+  @ApiResponse(message = "Column Metadata", code = 200)
+  @RequestMapping(value = "/column/metadata", method = RequestMethod.POST)
+  ResponseEntity<Map<String, Map<String, FieldType>>> getColumnMetadata(final 
@ApiParam(name = "indices", value = "Indices", required = true) @RequestBody 
List<String> indices) throws RestException {
+    return new ResponseEntity<>(searchService.getColumnMetadata(indices), 
HttpStatus.OK);
+  }
+
+  @ApiOperation(value = "Get metadata for columns shared by the list of 
indices")
+  @ApiResponse(message = "Common Column Metadata", code = 200)
+  @RequestMapping(value = "/column/metadata/common", method = 
RequestMethod.POST)
+  ResponseEntity<Map<String, FieldType>> getCommonColumnMetadata(final 
@ApiParam(name = "indices", value = "Indices", required = true) @RequestBody 
List<String> indices) throws RestException {
+    return new 
ResponseEntity<>(searchService.getCommonColumnMetadata(indices), HttpStatus.OK);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
index df28d4a..b2fb2e6 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java
@@ -17,12 +17,18 @@
  */
 package org.apache.metron.rest.service;
 
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 
+import java.util.List;
+import java.util.Map;
+
 public interface SearchService {
 
   SearchResponse search(SearchRequest searchRequest) throws RestException;
+  Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) 
throws RestException;
+  Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws 
RestException;
 
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
index 123d6d0..5ff22c9 100644
--- 
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
+++ 
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java
@@ -21,11 +21,16 @@ import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.service.SearchService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
 @Service
 public class IndexDaoSearchServiceImpl implements SearchService {
   private IndexDao dao;
@@ -44,4 +49,24 @@ public class IndexDaoSearchServiceImpl implements 
SearchService {
       throw new RestException(ise.getMessage(), ise);
     }
   }
+
+  @Override
+  public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws RestException {
+    try {
+      return dao.getColumnMetadata(indices);
+    }
+    catch(IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
+
+  @Override
+  public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) 
throws RestException {
+    try {
+      return dao.getCommonColumnMetadata(indices);
+    }
+    catch(IOException ioe) {
+      throw new RestException(ioe.getMessage(), ioe);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
index f1eb1ae..44d9078 100644
--- 
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
+++ 
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java
@@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.indexing.dao.InMemoryDao;
 import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.rest.service.SearchService;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
@@ -45,6 +46,7 @@ import java.util.List;
 import java.util.Map;
 
 import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+import static org.hamcrest.Matchers.hasSize;
 import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf;
 import static 
org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic;
 import static 
org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity;
@@ -77,6 +79,7 @@ public class SearchControllerIntegrationTest {
   public void setup() throws Exception {
     this.mockMvc = 
MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build();
     loadTestData();
+    loadColumnTypes();
   }
 
   @After
@@ -176,6 +179,62 @@ public class SearchControllerIntegrationTest {
             
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
             .andExpect(jsonPath("$.responseCode").value(500))
             .andExpect(jsonPath("$.message").value("Search result size must be 
less than 100"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\",\"snort\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(2)))
+            .andExpect(jsonPath("$.bro.common_string_field").value("string"))
+            .andExpect(jsonPath("$.bro.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.bro.bro_field").value("boolean"))
+            .andExpect(jsonPath("$.bro.duplicate_field").value("date"))
+            .andExpect(jsonPath("$.snort.common_string_field").value("string"))
+            
.andExpect(jsonPath("$.snort.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.snort.snort_field").value("double"))
+            .andExpect(jsonPath("$.snort.duplicate_field").value("long"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata/common").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\",\"snort\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(2)))
+            .andExpect(jsonPath("$.common_string_field").value("string"))
+            .andExpect(jsonPath("$.common_integer_field").value("integer"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(1)))
+            .andExpect(jsonPath("$.bro.common_string_field").value("string"))
+            .andExpect(jsonPath("$.bro.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.bro.bro_field").value("boolean"))
+            .andExpect(jsonPath("$.bro.duplicate_field").value("date"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata/common").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(4)))
+            .andExpect(jsonPath("$.common_string_field").value("string"))
+            .andExpect(jsonPath("$.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.bro_field").value("boolean"))
+            .andExpect(jsonPath("$.duplicate_field").value("date"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"snort\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(1)))
+            .andExpect(jsonPath("$.snort.common_string_field").value("string"))
+            
.andExpect(jsonPath("$.snort.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.snort.snort_field").value("double"))
+            .andExpect(jsonPath("$.snort.duplicate_field").value("long"));
+
+    this.mockMvc.perform(post(searchUrl + 
"/column/metadata/common").with(httpBasic(user, 
password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"snort\"]"))
+            .andExpect(status().isOk())
+            
.andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8")))
+            .andExpect(jsonPath("$.*", hasSize(4)))
+            .andExpect(jsonPath("$.common_string_field").value("string"))
+            .andExpect(jsonPath("$.common_integer_field").value("integer"))
+            .andExpect(jsonPath("$.snort_field").value("double"))
+            .andExpect(jsonPath("$.duplicate_field").value("long"));
   }
 
 
@@ -199,4 +258,21 @@ public class SearchControllerIntegrationTest {
     }
     InMemoryDao.load(backingStore);
   }
+
+  private void loadColumnTypes() throws ParseException {
+    Map<String, Map<String, FieldType>> columnTypes = new HashMap<>();
+    Map<String, FieldType> broTypes = new HashMap<>();
+    broTypes.put("common_string_field", FieldType.STRING);
+    broTypes.put("common_integer_field", FieldType.INTEGER);
+    broTypes.put("bro_field", FieldType.BOOLEAN);
+    broTypes.put("duplicate_field", FieldType.DATE);
+    Map<String, FieldType> snortTypes = new HashMap<>();
+    snortTypes.put("common_string_field", FieldType.STRING);
+    snortTypes.put("common_integer_field", FieldType.INTEGER);
+    snortTypes.put("snort_field", FieldType.DOUBLE);
+    snortTypes.put("duplicate_field", FieldType.LONG);
+    columnTypes.put("bro", broTypes);
+    columnTypes.put("snort", snortTypes);
+    InMemoryDao.setColumnMetadata(columnTypes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index a4838b5..cb2b1ca 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -21,29 +21,54 @@ import 
org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.*;
-import org.apache.metron.indexing.dao.search.SortOrder;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.*;
 
+import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
 public class ElasticsearchDao implements IndexDao {
   private transient TransportClient client;
   private AccessConfig accessConfig;
+  private List<String> ignoredIndices = new ArrayList<>();
 
   protected ElasticsearchDao(TransportClient client, AccessConfig config) {
     this.client = client;
     this.accessConfig = config;
+    this.ignoredIndices.add(".kibana");
   }
 
   public ElasticsearchDao() {
     //uninitialized.
   }
 
+  private static Map<String, FieldType> elasticsearchSearchTypeMap;
+
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("string", FieldType.STRING);
+    fieldTypeMap.put("ip", FieldType.IP);
+    fieldTypeMap.put("integer", FieldType.INTEGER);
+    fieldTypeMap.put("long", FieldType.LONG);
+    fieldTypeMap.put("date", FieldType.DATE);
+    fieldTypeMap.put("float", FieldType.FLOAT);
+    fieldTypeMap.put("double", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
     if(client == null) {
@@ -87,4 +112,71 @@ public class ElasticsearchDao implements IndexDao {
     this.client = ElasticsearchUtils.getClient(globalConfig, 
config.getOptionalSettings());
     this.accessConfig = config;
   }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws IOException {
+    Map<String, Map<String, FieldType>> allColumnMetadata = new HashMap<>();
+    ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings =
+            client.admin().indices().getMappings(new 
GetMappingsRequest().indices(getLatestIndices(indices))).actionGet().getMappings();
+    for(Object index: mappings.keys().toArray()) {
+      Map<String, FieldType> indexColumnMetadata = new HashMap<>();
+      ImmutableOpenMap<String, MappingMetaData> mapping = 
mappings.get(index.toString());
+      Iterator<String> mappingIterator = mapping.keysIt();
+      while(mappingIterator.hasNext()) {
+        MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
+        Map<String, Map<String, String>> map = (Map<String, Map<String, 
String>>) mappingMetaData.getSourceAsMap().get("properties");
+        for(String field: map.keySet()) {
+          indexColumnMetadata.put(field, 
elasticsearchSearchTypeMap.getOrDefault(map.get(field).get("type"), 
FieldType.OTHER));
+        }
+      }
+      allColumnMetadata.put(index.toString().split("_index_")[0], 
indexColumnMetadata);
+    }
+    return allColumnMetadata;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) 
throws IOException {
+    Map<String, FieldType> commonColumnMetadata = null;
+    ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> 
mappings =
+            client.admin().indices().getMappings(new 
GetMappingsRequest().indices(getLatestIndices(indices))).actionGet().getMappings();
+    for(Object index: mappings.keys().toArray()) {
+      ImmutableOpenMap<String, MappingMetaData> mapping = 
mappings.get(index.toString());
+      Iterator<String> mappingIterator = mapping.keysIt();
+      while(mappingIterator.hasNext()) {
+        MappingMetaData mappingMetaData = mapping.get(mappingIterator.next());
+        Map<String, Map<String, String>> map = (Map<String, Map<String, 
String>>) mappingMetaData.getSourceAsMap().get("properties");
+        Map<String, FieldType> mappingsWithTypes = 
map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey,
+                e-> 
elasticsearchSearchTypeMap.getOrDefault(e.getValue().get("type"), 
FieldType.OTHER)));
+        if (commonColumnMetadata == null) {
+          commonColumnMetadata = mappingsWithTypes;
+        } else {
+          
commonColumnMetadata.entrySet().retainAll(mappingsWithTypes.entrySet());
+        }
+      }
+    }
+    return commonColumnMetadata;
+  }
+
+  protected String[] getLatestIndices(List<String> includeIndices) {
+    Map<String, String> latestIndices = new HashMap<>();
+    String[] indices = 
client.admin().indices().prepareGetIndex().setFeatures().get().getIndices();
+    for (String index : indices) {
+      if (!ignoredIndices.contains(index)) {
+        int prefixEnd = index.indexOf("_index_");
+        if (prefixEnd != -1) {
+          String prefix = index.substring(0, prefixEnd);
+          if (includeIndices.contains(prefix)) {
+            String latestIndex = latestIndices.get(prefix);
+            if (latestIndex == null || index.compareTo(latestIndex) > 0) {
+              latestIndices.put(prefix, index);
+            }
+          }
+        }
+      }
+    }
+    return latestIndices.values().toArray(new String[latestIndices.size()]);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
index d937fff..ffc41b3 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
@@ -18,6 +18,7 @@
 package org.apache.metron.elasticsearch.integration;
 
 
+import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
@@ -41,6 +42,50 @@ public class ElasticsearchDaoIntegrationTest extends 
IndexingDaoIntegrationTest
   private static String indexDir = "target/elasticsearch_search";
   private static String dateFormat = "yyyy.MM.dd.HH";
 
+  /**
+   * {
+   * "bro_doc": {
+   *   "properties": {
+   *     "source:type": { "type": "string" },
+   *     "ip_src_addr": { "type": "ip" },
+   *     "ip_src_port": { "type": "integer" },
+   *     "long_field": { "type": "long" },
+   *     "timestamp" : { "type": "date" },
+   *     "latitude" : { "type": "float" },
+   *     "double_field": { "type": "double" },
+   *     "is_alert": { "type": "boolean" },
+   *     "location_point": { "type": "geo_point" },
+   *     "bro_field": { "type": "string" },
+   *     "duplicate_name_field": { "type": "string" }
+   *   }
+   * }
+   * }
+   */
+  @Multiline
+  private static String broTypeMappings;
+
+  /**
+   * {
+   * "snort_doc": {
+   *   "properties": {
+   *     "source:type": { "type": "string" },
+   *     "ip_src_addr": { "type": "ip" },
+   *     "ip_src_port": { "type": "integer" },
+   *     "long_field": { "type": "long" },
+   *     "timestamp" : { "type": "date" },
+   *     "latitude" : { "type": "float" },
+   *     "double_field": { "type": "double" },
+   *     "is_alert": { "type": "boolean" },
+   *     "location_point": { "type": "geo_point" },
+   *     "snort_field": { "type": "integer" },
+   *     "duplicate_name_field": { "type": "integer" }
+   *   }
+   * }
+   * }
+   */
+  @Multiline
+  private static String snortTypeMappings;
+
 
   @Override
   protected IndexDao createDao() throws Exception {
@@ -72,6 +117,11 @@ public class ElasticsearchDaoIntegrationTest extends 
IndexingDaoIntegrationTest
   @Override
   protected void loadTestData() throws ParseException {
     ElasticSearchComponent es = (ElasticSearchComponent)indexComponent;
+    es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
+            .addMapping("bro_doc", broTypeMappings).get();
+    es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
+            .addMapping("snort_doc", snortTypeMappings).get();
+
     BulkRequestBuilder bulkRequest = 
es.getClient().prepareBulk().setRefresh(true);
     JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
     for(Object o: broArray) {

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index a835d65..31fe74e 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -20,10 +20,15 @@ package org.apache.metron.indexing.dao;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.FieldType;
 
+import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 
 public interface IndexDao {
   SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException;
   void init(Map<String, Object> globalConfig, AccessConfig config);
+  Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) 
throws IOException;
+  Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws 
IOException;
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java
new file mode 100644
index 0000000..5848cb3
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public enum FieldType {
+  @JsonProperty("string")
+  STRING("string"),
+  @JsonProperty("ip")
+  IP("ip"),
+  @JsonProperty("integer")
+  INTEGER("integer"),
+  @JsonProperty("long")
+  LONG("long"),
+  @JsonProperty("date")
+  DATE("date"),
+  @JsonProperty("float")
+  FLOAT("float"),
+  @JsonProperty("double")
+  DOUBLE("double"),
+  @JsonProperty("boolean")
+  BOOLEAN("boolean"),
+  @JsonProperty("other")
+  OTHER("other");
+
+
+  private String fieldType;
+
+  FieldType(String fieldType) {
+    this.fieldType = fieldType;
+  }
+
+  public String getFieldType() {
+    return fieldType;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index 444a9da..ab83c7e 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -30,6 +30,7 @@ import java.util.*;
 
 public class InMemoryDao implements IndexDao {
   public static Map<String, List<String>> BACKING_STORE = new HashMap<>();
+  public static Map<String, Map<String, FieldType>> COLUMN_METADATA;
   private AccessConfig config;
 
   @Override
@@ -132,11 +133,42 @@ public class InMemoryDao implements IndexDao {
     this.config = config;
   }
 
+  @Override
+  public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws IOException {
+    Map<String, Map<String, FieldType>> columnMetadata = new HashMap<>();
+    for(String index: indices) {
+      columnMetadata.put(index, new HashMap<>(COLUMN_METADATA.get(index)));
+    }
+    return columnMetadata;
+  }
+
+  @Override
+  public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) 
throws IOException {
+    Map<String, FieldType> commonColumnMetadata = new HashMap<>();
+    for(String index: indices) {
+      if (commonColumnMetadata.isEmpty()) {
+        commonColumnMetadata = new HashMap<>(COLUMN_METADATA.get(index));
+      } else {
+        
commonColumnMetadata.entrySet().retainAll(COLUMN_METADATA.get(index).entrySet());
+      }
+    }
+    return commonColumnMetadata;
+  }
+
+  public static void setColumnMetadata(Map<String, Map<String, FieldType>> 
columnMetadata) {
+    Map<String, Map<String, FieldType>> columnMetadataMap = new HashMap<>();
+    for (Map.Entry<String, Map<String, FieldType>> e: 
columnMetadata.entrySet()) {
+      columnMetadataMap.put(e.getKey(), 
Collections.unmodifiableMap(e.getValue()));
+    }
+    COLUMN_METADATA = columnMetadataMap;
+  }
+
   public static void load(Map<String, List<String>> backingStore) {
     BACKING_STORE = backingStore;
   }
 
   public static void clear() {
     BACKING_STORE.clear();
+    COLUMN_METADATA.clear();
   }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
index 8b5baef..209c234 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java
@@ -19,6 +19,7 @@ package org.apache.metron.indexing.dao;
 
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
@@ -27,16 +28,20 @@ import org.apache.metron.integration.InMemoryComponent;
 import org.json.simple.parser.ParseException;
 import org.junit.*;
 
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 public abstract class IndexingDaoIntegrationTest {
   /**
    * [
-   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"timestamp":1, "rejected":true},
-   * {"source:type": "bro" "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"timestamp":2, "rejected":false},
-   * {"source:type": "bro" "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"timestamp":3, "rejected":true},
-   * {"source:type": "bro" "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"timestamp":4, "rejected":false},
-   * {"source:type": "bro" "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"timestamp":5, "rejected":true}
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, 
"long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 1", "duplicate_name_field": "data 1"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, 
"long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 
1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": 
"bro data 2", "duplicate_name_field": "data 2"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, 
"long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 
1.00003, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro 
data 3", "duplicate_name_field": "data 3"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, 
"long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 
1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 4", "duplicate_name_field": "data 4"},
+   * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, 
"long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 
1.00005, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro 
data 5", "duplicate_name_field": "data 5"}
    * ]
    */
   @Multiline
@@ -44,11 +49,11 @@ public abstract class IndexingDaoIntegrationTest {
 
   /**
    * [
-   * {"source:type": "snort" "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, 
"timestamp":6, "is_alert":false},
-   * {"source:type": "snort" "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, 
"timestamp":7, "is_alert":true},
-   * {"source:type": "snort" "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, 
"timestamp":8, "is_alert":false},
-   * {"source:type": "snort" "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, 
"timestamp":9, "is_alert":true},
-   * {"source:type": "snort" "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, 
"timestamp":10, "is_alert":false}
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 
1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, 
"duplicate_name_field": 1},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 
1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 
20, "duplicate_name_field": 2},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 
1.00003, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 
30, "duplicate_name_field": 3},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 
1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 
40, "duplicate_name_field": 4},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 
1.00005, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 
50, "duplicate_name_field": 5}
    * ]
    */
   @Multiline
@@ -242,6 +247,82 @@ public abstract class IndexingDaoIntegrationTest {
         Assert.assertEquals("Search result size must be less than 100", 
ise.getMessage());
       }
     }
+    // getColumnMetadata with multiple indices
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+      Assert.assertEquals(2, fieldTypes.size());
+      Map<String, FieldType> broTypes = fieldTypes.get("bro");
+      Assert.assertEquals(11, broTypes.size());
+      Assert.assertEquals(FieldType.STRING, broTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, broTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, broTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, broTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point"));
+      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.STRING, 
broTypes.get("duplicate_name_field"));
+      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
+      Assert.assertEquals(11, snortTypes.size());
+      Assert.assertEquals(FieldType.STRING, snortTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, snortTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point"));
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, 
snortTypes.get("duplicate_name_field"));
+    }
+    // getColumnMetadata with only bro
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(1, fieldTypes.size());
+      Map<String, FieldType> broTypes = fieldTypes.get("bro");
+      Assert.assertEquals(11, broTypes.size());
+      Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field"));
+    }
+    // getColumnMetadata with only snort
+    {
+      Map<String, Map<String, FieldType>> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(1, fieldTypes.size());
+      Map<String, FieldType> snortTypes = fieldTypes.get("snort");
+      Assert.assertEquals(11, snortTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field"));
+    }
+    // getCommonColumnMetadata with multiple Indices
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getCommonColumnMetadata(Arrays.asList("bro", "snort"));
+      // Should only return fields in both
+      Assert.assertEquals(9, fieldTypes.size());
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+    }
+    // getCommonColumnMetadata with only bro
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getCommonColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(11, fieldTypes.size());
+      Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.STRING, 
fieldTypes.get("duplicate_name_field"));
+    }
+    // getCommonColumnMetadata with only snort
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getCommonColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(11, fieldTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
+    }
   }
 
   @After

Reply via email to