Repository: metron Updated Branches: refs/heads/master 14bcc0389 -> 19096fe62
METRON-1056 Get field types from Elasticsearch (merrimanr) closes apache/metron#662 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/19096fe6 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/19096fe6 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/19096fe6 Branch: refs/heads/master Commit: 19096fe62c4728d326bfc87b2aa831c6aec3d5b2 Parents: 14bcc03 Author: merrimanr <[email protected]> Authored: Tue Aug 1 15:14:31 2017 -0500 Committer: merrimanr <[email protected]> Committed: Tue Aug 1 15:14:31 2017 -0500 ---------------------------------------------------------------------- metron-interface/metron-rest/README.md | 16 +++ .../rest/controller/SearchController.java | 18 ++++ .../metron/rest/service/SearchService.java | 6 ++ .../service/impl/IndexDaoSearchServiceImpl.java | 25 +++++ .../SearchControllerIntegrationTest.java | 76 ++++++++++++++ .../elasticsearch/dao/ElasticsearchDao.java | 94 ++++++++++++++++- .../ElasticsearchDaoIntegrationTest.java | 50 +++++++++ .../apache/metron/indexing/dao/IndexDao.java | 5 + .../metron/indexing/dao/search/FieldType.java | 52 ++++++++++ .../apache/metron/indexing/dao/InMemoryDao.java | 32 ++++++ .../dao/IndexingDaoIntegrationTest.java | 101 +++++++++++++++++-- 11 files changed, 464 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index add8f27..46a3fc0 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -200,6 +200,8 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `DELETE /api/v1/kafka/topic/{name}`](#delete-apiv1kafkatopicname)| | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)| | [ `GET /api/v1/search/search`](#get-apiv1searchsearch)| +| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadata)| +| [ `GET /api/v1/search/search`](#get-apiv1searchcolumnmetadatacommon)| | [ `GET /api/v1/sensor/enrichment/config`](#get-apiv1sensorenrichmentconfig)| | [ `GET /api/v1/sensor/enrichment/config/list/available/enrichments`](#get-apiv1sensorenrichmentconfiglistavailableenrichments)| | [ `GET /api/v1/sensor/enrichment/config/list/available/threat/triage/aggregators`](#get-apiv1sensorenrichmentconfiglistavailablethreattriageaggregators)| @@ -354,6 +356,20 @@ Request and Response objects are JSON formatted. The JSON schemas are available * searchRequest - Search request * Returns: * 200 - Search results + +### `GET /api/v1/search/column/metadata` + * Description: Get column metadata for each index in the list of indicies + * Input: + * indices - Indices + * Returns: + * 200 - Column Metadata + +### `GET /api/v1/search/column/metadata/common` + * Description: Get metadata for columns shared by the list of indices + * Input: + * indices - Indices + * Returns: + * 200 - Common Column Metadata ### `GET /api/v1/sensor/enrichment/config` * Description: Retrieves all SensorEnrichmentConfigs from Zookeeper http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java index 6915666..d3b805f 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/SearchController.java @@ -20,6 +20,7 @@ package org.apache.metron.rest.controller; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -32,6 +33,9 @@ import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; +import java.util.List; +import java.util.Map; + @RestController @RequestMapping("/api/v1/search") public class SearchController { @@ -45,4 +49,18 @@ public class SearchController { ResponseEntity<SearchResponse> search(final @ApiParam(name = "searchRequest", value = "Search request", required = true) @RequestBody SearchRequest searchRequest) throws RestException { return new ResponseEntity<>(searchService.search(searchRequest), HttpStatus.OK); } + + @ApiOperation(value = "Get column metadata for each index in the list of indices") + @ApiResponse(message = "Column Metadata", code = 200) + @RequestMapping(value = "/column/metadata", method = RequestMethod.POST) + ResponseEntity<Map<String, Map<String, FieldType>>> getColumnMetadata(final @ApiParam(name = "indices", value = "Indices", required = true) @RequestBody List<String> indices) throws RestException { + return new ResponseEntity<>(searchService.getColumnMetadata(indices), HttpStatus.OK); + } + + @ApiOperation(value = "Get metadata for columns shared by the list of indices") + @ApiResponse(message = "Common Column Metadata", code = 200) + @RequestMapping(value = "/column/metadata/common", method = RequestMethod.POST) + ResponseEntity<Map<String, FieldType>> getCommonColumnMetadata(final @ApiParam(name = "indices", value = "Indices", required = true) @RequestBody List<String> indices) throws RestException { + return new ResponseEntity<>(searchService.getCommonColumnMetadata(indices), HttpStatus.OK); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java index df28d4a..b2fb2e6 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/SearchService.java @@ -17,12 +17,18 @@ */ package org.apache.metron.rest.service; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import java.util.List; +import java.util.Map; + public interface SearchService { SearchResponse search(SearchRequest searchRequest) throws RestException; + Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws RestException; + Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java index 123d6d0..5ff22c9 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/IndexDaoSearchServiceImpl.java @@ -21,11 +21,16 @@ import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SearchService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import java.io.IOException; +import java.util.List; +import java.util.Map; + @Service public class IndexDaoSearchServiceImpl implements SearchService { private IndexDao dao; @@ -44,4 +49,24 @@ public class IndexDaoSearchServiceImpl implements SearchService { throw new RestException(ise.getMessage(), ise); } } + + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws RestException { + try { + return dao.getColumnMetadata(indices); + } + catch(IOException ioe) { + throw new RestException(ioe.getMessage(), ioe); + } + } + + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws RestException { + try { + return dao.getCommonColumnMetadata(indices); + } + catch(IOException ioe) { + throw new RestException(ioe.getMessage(), ioe); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java index f1eb1ae..44d9078 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.indexing.dao.InMemoryDao; import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.rest.service.SearchService; import org.json.simple.JSONArray; import org.json.simple.JSONObject; @@ -45,6 +46,7 @@ import java.util.List; import java.util.Map; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.hamcrest.Matchers.hasSize; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; @@ -77,6 +79,7 @@ public class SearchControllerIntegrationTest { public void setup() throws Exception { this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); loadTestData(); + loadColumnTypes(); } @After @@ -176,6 +179,62 @@ public class SearchControllerIntegrationTest { .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) .andExpect(jsonPath("$.responseCode").value(500)) .andExpect(jsonPath("$.message").value("Search result size must be less than 100")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\",\"snort\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(2))) + .andExpect(jsonPath("$.bro.common_string_field").value("string")) + .andExpect(jsonPath("$.bro.common_integer_field").value("integer")) + .andExpect(jsonPath("$.bro.bro_field").value("boolean")) + .andExpect(jsonPath("$.bro.duplicate_field").value("date")) + .andExpect(jsonPath("$.snort.common_string_field").value("string")) + .andExpect(jsonPath("$.snort.common_integer_field").value("integer")) + .andExpect(jsonPath("$.snort.snort_field").value("double")) + .andExpect(jsonPath("$.snort.duplicate_field").value("long")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata/common").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\",\"snort\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(2))) + .andExpect(jsonPath("$.common_string_field").value("string")) + .andExpect(jsonPath("$.common_integer_field").value("integer")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(1))) + .andExpect(jsonPath("$.bro.common_string_field").value("string")) + .andExpect(jsonPath("$.bro.common_integer_field").value("integer")) + .andExpect(jsonPath("$.bro.bro_field").value("boolean")) + .andExpect(jsonPath("$.bro.duplicate_field").value("date")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata/common").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"bro\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(4))) + .andExpect(jsonPath("$.common_string_field").value("string")) + .andExpect(jsonPath("$.common_integer_field").value("integer")) + .andExpect(jsonPath("$.bro_field").value("boolean")) + .andExpect(jsonPath("$.duplicate_field").value("date")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"snort\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(1))) + .andExpect(jsonPath("$.snort.common_string_field").value("string")) + .andExpect(jsonPath("$.snort.common_integer_field").value("integer")) + .andExpect(jsonPath("$.snort.snort_field").value("double")) + .andExpect(jsonPath("$.snort.duplicate_field").value("long")); + + this.mockMvc.perform(post(searchUrl + "/column/metadata/common").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content("[\"snort\"]")) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.*", hasSize(4))) + .andExpect(jsonPath("$.common_string_field").value("string")) + .andExpect(jsonPath("$.common_integer_field").value("integer")) + .andExpect(jsonPath("$.snort_field").value("double")) + .andExpect(jsonPath("$.duplicate_field").value("long")); } @@ -199,4 +258,21 @@ public class SearchControllerIntegrationTest { } InMemoryDao.load(backingStore); } + + private void loadColumnTypes() throws ParseException { + Map<String, Map<String, FieldType>> columnTypes = new HashMap<>(); + Map<String, FieldType> broTypes = new HashMap<>(); + broTypes.put("common_string_field", FieldType.STRING); + broTypes.put("common_integer_field", FieldType.INTEGER); + broTypes.put("bro_field", FieldType.BOOLEAN); + broTypes.put("duplicate_field", FieldType.DATE); + Map<String, FieldType> snortTypes = new HashMap<>(); + snortTypes.put("common_string_field", FieldType.STRING); + snortTypes.put("common_integer_field", FieldType.INTEGER); + snortTypes.put("snort_field", FieldType.DOUBLE); + snortTypes.put("duplicate_field", FieldType.LONG); + columnTypes.put("bro", broTypes); + columnTypes.put("snort", snortTypes); + InMemoryDao.setColumnMetadata(columnTypes); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index a4838b5..cb2b1ca 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -21,29 +21,54 @@ import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.*; -import org.apache.metron.indexing.dao.search.SortOrder; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.*; +import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { private transient TransportClient client; private AccessConfig accessConfig; + private List<String> ignoredIndices = new ArrayList<>(); protected ElasticsearchDao(TransportClient client, AccessConfig config) { this.client = client; this.accessConfig = config; + this.ignoredIndices.add(".kibana"); } public ElasticsearchDao() { //uninitialized. } + private static Map<String, FieldType> elasticsearchSearchTypeMap; + + static { + Map<String, FieldType> fieldTypeMap = new HashMap<>(); + fieldTypeMap.put("string", FieldType.STRING); + fieldTypeMap.put("ip", FieldType.IP); + fieldTypeMap.put("integer", FieldType.INTEGER); + fieldTypeMap.put("long", FieldType.LONG); + fieldTypeMap.put("date", FieldType.DATE); + fieldTypeMap.put("float", FieldType.FLOAT); + fieldTypeMap.put("double", FieldType.DOUBLE); + fieldTypeMap.put("boolean", FieldType.BOOLEAN); + elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap); + } + @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { if(client == null) { @@ -87,4 +112,71 @@ public class ElasticsearchDao implements IndexDao { this.client = ElasticsearchUtils.getClient(globalConfig, config.getOptionalSettings()); this.accessConfig = config; } + + @SuppressWarnings("unchecked") + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { + Map<String, Map<String, FieldType>> allColumnMetadata = new HashMap<>(); + ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = + client.admin().indices().getMappings(new GetMappingsRequest().indices(getLatestIndices(indices))).actionGet().getMappings(); + for(Object index: mappings.keys().toArray()) { + Map<String, FieldType> indexColumnMetadata = new HashMap<>(); + ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(index.toString()); + Iterator<String> mappingIterator = mapping.keysIt(); + while(mappingIterator.hasNext()) { + MappingMetaData mappingMetaData = mapping.get(mappingIterator.next()); + Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData.getSourceAsMap().get("properties"); + for(String field: map.keySet()) { + indexColumnMetadata.put(field, elasticsearchSearchTypeMap.getOrDefault(map.get(field).get("type"), FieldType.OTHER)); + } + } + allColumnMetadata.put(index.toString().split("_index_")[0], indexColumnMetadata); + } + return allColumnMetadata; + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws IOException { + Map<String, FieldType> commonColumnMetadata = null; + ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = + client.admin().indices().getMappings(new GetMappingsRequest().indices(getLatestIndices(indices))).actionGet().getMappings(); + for(Object index: mappings.keys().toArray()) { + ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(index.toString()); + Iterator<String> mappingIterator = mapping.keysIt(); + while(mappingIterator.hasNext()) { + MappingMetaData mappingMetaData = mapping.get(mappingIterator.next()); + Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData.getSourceAsMap().get("properties"); + Map<String, FieldType> mappingsWithTypes = map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, + e-> elasticsearchSearchTypeMap.getOrDefault(e.getValue().get("type"), FieldType.OTHER))); + if (commonColumnMetadata == null) { + commonColumnMetadata = mappingsWithTypes; + } else { + commonColumnMetadata.entrySet().retainAll(mappingsWithTypes.entrySet()); + } + } + } + return commonColumnMetadata; + } + + protected String[] getLatestIndices(List<String> includeIndices) { + Map<String, String> latestIndices = new HashMap<>(); + String[] indices = client.admin().indices().prepareGetIndex().setFeatures().get().getIndices(); + for (String index : indices) { + if (!ignoredIndices.contains(index)) { + int prefixEnd = index.indexOf("_index_"); + if (prefixEnd != -1) { + String prefix = index.substring(0, prefixEnd); + if (includeIndices.contains(prefix)) { + String latestIndex = latestIndices.get(prefix); + if (latestIndex == null || index.compareTo(latestIndex) > 0) { + latestIndices.put(prefix, index); + } + } + } + } + } + return latestIndices.values().toArray(new String[latestIndices.size()]); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java index d937fff..ffc41b3 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java @@ -18,6 +18,7 @@ package org.apache.metron.elasticsearch.integration; +import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; @@ -41,6 +42,50 @@ public class ElasticsearchDaoIntegrationTest extends IndexingDaoIntegrationTest private static String indexDir = "target/elasticsearch_search"; private static String dateFormat = "yyyy.MM.dd.HH"; + /** + * { + * "bro_doc": { + * "properties": { + * "source:type": { "type": "string" }, + * "ip_src_addr": { "type": "ip" }, + * "ip_src_port": { "type": "integer" }, + * "long_field": { "type": "long" }, + * "timestamp" : { "type": "date" }, + * "latitude" : { "type": "float" }, + * "double_field": { "type": "double" }, + * "is_alert": { "type": "boolean" }, + * "location_point": { "type": "geo_point" }, + * "bro_field": { "type": "string" }, + * "duplicate_name_field": { "type": "string" } + * } + * } + * } + */ + @Multiline + private static String broTypeMappings; + + /** + * { + * "snort_doc": { + * "properties": { + * "source:type": { "type": "string" }, + * "ip_src_addr": { "type": "ip" }, + * "ip_src_port": { "type": "integer" }, + * "long_field": { "type": "long" }, + * "timestamp" : { "type": "date" }, + * "latitude" : { "type": "float" }, + * "double_field": { "type": "double" }, + * "is_alert": { "type": "boolean" }, + * "location_point": { "type": "geo_point" }, + * "snort_field": { "type": "integer" }, + * "duplicate_name_field": { "type": "integer" } + * } + * } + * } + */ + @Multiline + private static String snortTypeMappings; + @Override protected IndexDao createDao() throws Exception { @@ -72,6 +117,11 @@ public class ElasticsearchDaoIntegrationTest extends IndexingDaoIntegrationTest @Override protected void loadTestData() throws ParseException { ElasticSearchComponent es = (ElasticSearchComponent)indexComponent; + es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") + .addMapping("bro_doc", broTypeMappings).get(); + es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02") + .addMapping("snort_doc", snortTypeMappings).get(); + BulkRequestBuilder bulkRequest = es.getClient().prepareBulk().setRefresh(true); JSONArray broArray = (JSONArray) new JSONParser().parse(broData); for(Object o: broArray) { http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java index a835d65..31fe74e 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java @@ -20,10 +20,15 @@ package org.apache.metron.indexing.dao; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.FieldType; +import java.io.IOException; +import java.util.List; import java.util.Map; public interface IndexDao { SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException; void init(Map<String, Object> globalConfig, AccessConfig config); + Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException; + Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws IOException; } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java new file mode 100644 index 0000000..5848cb3 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/FieldType.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.search; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public enum FieldType { + @JsonProperty("string") + STRING("string"), + @JsonProperty("ip") + IP("ip"), + @JsonProperty("integer") + INTEGER("integer"), + @JsonProperty("long") + LONG("long"), + @JsonProperty("date") + DATE("date"), + @JsonProperty("float") + FLOAT("float"), + @JsonProperty("double") + DOUBLE("double"), + @JsonProperty("boolean") + BOOLEAN("boolean"), + @JsonProperty("other") + OTHER("other"); + + + private String fieldType; + + FieldType(String fieldType) { + this.fieldType = fieldType; + } + + public String getFieldType() { + return fieldType; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java index 444a9da..ab83c7e 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java @@ -30,6 +30,7 @@ import java.util.*; public class InMemoryDao implements IndexDao { public static Map<String, List<String>> BACKING_STORE = new HashMap<>(); + public static Map<String, Map<String, FieldType>> COLUMN_METADATA; private AccessConfig config; @Override @@ -132,11 +133,42 @@ public class InMemoryDao implements IndexDao { this.config = config; } + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { + Map<String, Map<String, FieldType>> columnMetadata = new HashMap<>(); + for(String index: indices) { + columnMetadata.put(index, new HashMap<>(COLUMN_METADATA.get(index))); + } + return columnMetadata; + } + + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws IOException { + Map<String, FieldType> commonColumnMetadata = new HashMap<>(); + for(String index: indices) { + if (commonColumnMetadata.isEmpty()) { + commonColumnMetadata = new HashMap<>(COLUMN_METADATA.get(index)); + } else { + commonColumnMetadata.entrySet().retainAll(COLUMN_METADATA.get(index).entrySet()); + } + } + return commonColumnMetadata; + } + + public static void setColumnMetadata(Map<String, Map<String, FieldType>> columnMetadata) { + Map<String, Map<String, FieldType>> columnMetadataMap = new HashMap<>(); + for (Map.Entry<String, Map<String, FieldType>> e: columnMetadata.entrySet()) { + columnMetadataMap.put(e.getKey(), Collections.unmodifiableMap(e.getValue())); + } + COLUMN_METADATA = columnMetadataMap; + } + public static void load(Map<String, List<String>> backingStore) { BACKING_STORE = backingStore; } public static void clear() { BACKING_STORE.clear(); + COLUMN_METADATA.clear(); } } http://git-wip-us.apache.org/repos/asf/metron/blob/19096fe6/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java index 8b5baef..209c234 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/IndexingDaoIntegrationTest.java @@ -19,6 +19,7 @@ package org.apache.metron.indexing.dao; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -27,16 +28,20 @@ import org.apache.metron.integration.InMemoryComponent; import org.json.simple.parser.ParseException; import org.junit.*; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; public abstract class IndexingDaoIntegrationTest { /** * [ - * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "timestamp":1, "rejected":true}, - * {"source:type": "bro" "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "timestamp":2, "rejected":false}, - * {"source:type": "bro" "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "timestamp":3, "rejected":true}, - * {"source:type": "bro" "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "timestamp":4, "rejected":false}, - * {"source:type": "bro" "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "timestamp":5, "rejected":true} + * {"source:type": "bro", "ip_src_addr":"192.168.1.1", "ip_src_port": 8010, "long_field": 10000, "timestamp":1, "latitude": 48.5839, "double_field": 1.00001, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 1", "duplicate_name_field": "data 1"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "long_field": 20000, "timestamp":2, "latitude": 48.0001, "double_field": 1.00002, "is_alert":false, "location_point": "48.5839,7.7455", "bro_field": "bro data 2", "duplicate_name_field": "data 2"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.3", "ip_src_port": 8008, "long_field": 10000, "timestamp":3, "latitude": 48.5839, "double_field": 1.00003, "is_alert":true, "location_point": "50.0,7.7455", "bro_field": "bro data 3", "duplicate_name_field": "data 3"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.4", "ip_src_port": 8007, "long_field": 10000, "timestamp":4, "latitude": 48.5839, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 4", "duplicate_name_field": "data 4"}, + * {"source:type": "bro", "ip_src_addr":"192.168.1.5", "ip_src_port": 8006, "long_field": 10000, "timestamp":5, "latitude": 48.5839, "double_field": 1.00005, "is_alert":true, "location_point": "48.5839,7.7455", "bro_field": "bro data 5", "duplicate_name_field": "data 5"} * ] */ @Multiline @@ -44,11 +49,11 @@ public abstract class IndexingDaoIntegrationTest { /** * [ - * {"source:type": "snort" "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "timestamp":6, "is_alert":false}, - * {"source:type": "snort" "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "timestamp":7, "is_alert":true}, - * {"source:type": "snort" "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "timestamp":8, "is_alert":false}, - * {"source:type": "snort" "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "timestamp":9, "is_alert":true}, - * {"source:type": "snort" "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "timestamp":10, "is_alert":false} + * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "double_field": 1.00001, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "double_field": 1.00002, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "double_field": 1.00003, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "double_field": 1.00004, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "double_field": 1.00005, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5} * ] */ @Multiline @@ -242,6 +247,82 @@ public abstract class IndexingDaoIntegrationTest { Assert.assertEquals("Search result size must be less than 100", ise.getMessage()); } } + // getColumnMetadata with multiple indices + { + Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); + Assert.assertEquals(2, fieldTypes.size()); + Map<String, FieldType> broTypes = fieldTypes.get("bro"); + Assert.assertEquals(11, broTypes.size()); + Assert.assertEquals(FieldType.STRING, broTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, broTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, broTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, broTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, broTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, broTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, broTypes.get("double_field")); + Assert.assertEquals(FieldType.BOOLEAN, broTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, broTypes.get("location_point")); + Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field")); + Assert.assertEquals(FieldType.STRING, broTypes.get("duplicate_name_field")); + Map<String, FieldType> snortTypes = fieldTypes.get("snort"); + Assert.assertEquals(11, snortTypes.size()); + Assert.assertEquals(FieldType.STRING, snortTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, snortTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, snortTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, snortTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, snortTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, snortTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, snortTypes.get("double_field")); + Assert.assertEquals(FieldType.BOOLEAN, snortTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, snortTypes.get("location_point")); + Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field")); + Assert.assertEquals(FieldType.INTEGER, snortTypes.get("duplicate_name_field")); + } + // getColumnMetadata with only bro + { + Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); + Assert.assertEquals(1, fieldTypes.size()); + Map<String, FieldType> broTypes = fieldTypes.get("bro"); + Assert.assertEquals(11, broTypes.size()); + Assert.assertEquals(FieldType.STRING, broTypes.get("bro_field")); + } + // getColumnMetadata with only snort + { + Map<String, Map<String, FieldType>> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); + Assert.assertEquals(1, fieldTypes.size()); + Map<String, FieldType> snortTypes = fieldTypes.get("snort"); + Assert.assertEquals(11, snortTypes.size()); + Assert.assertEquals(FieldType.INTEGER, snortTypes.get("snort_field")); + } + // getCommonColumnMetadata with multiple Indices + { + Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Arrays.asList("bro", "snort")); + // Should only return fields in both + Assert.assertEquals(9, fieldTypes.size()); + Assert.assertEquals(FieldType.STRING, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("double_field")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + } + // getCommonColumnMetadata with only bro + { + Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("bro")); + Assert.assertEquals(11, fieldTypes.size()); + Assert.assertEquals(FieldType.STRING, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.STRING, fieldTypes.get("duplicate_name_field")); + } + // getCommonColumnMetadata with only snort + { + Map<String, FieldType> fieldTypes = dao.getCommonColumnMetadata(Collections.singletonList("snort")); + Assert.assertEquals(11, fieldTypes.size()); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); + } } @After
