METRON-1301 Alerts UI - Sorting on Triage Score Unexpectedly Filters Some Records (nickwallen) closes apache/metron#832
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4a089900 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4a089900 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4a089900 Branch: refs/heads/master Commit: 4a089900ad477b8e261ab2a2a7d4cafeaec21eca Parents: 768a6fa Author: nickwallen <[email protected]> Authored: Fri Nov 17 14:45:27 2017 -0500 Committer: nickallen <[email protected]> Committed: Fri Nov 17 14:45:27 2017 -0500 ---------------------------------------------------------------------- .../CURRENT/package/files/bro_index.template | 8 +- .../CURRENT/package/files/snort_index.template | 39 +- .../CURRENT/package/files/yaf_index.template | 6 +- .../apache/metron/rest/config/IndexConfig.java | 6 + .../SearchControllerIntegrationTest.java | 32 +- .../elasticsearch/dao/ColumnMetadataDao.java | 67 +++ .../dao/ElasticsearchColumnMetadataDao.java | 179 ++++++++ .../elasticsearch/dao/ElasticsearchDao.java | 426 +++++++++++-------- .../dao/ElasticsearchRequestSubmitter.java | 138 ++++++ .../elasticsearch/utils/ElasticsearchUtils.java | 52 +++ .../dao/ElasticsearchColumnMetadataDaoTest.java | 144 +++++++ .../elasticsearch/dao/ElasticsearchDaoTest.java | 240 +++++++---- .../dao/ElasticsearchRequestSubmitterTest.java | 121 ++++++ .../ElasticsearchSearchIntegrationTest.java | 154 ++++--- .../matcher/SearchRequestMatcher.java | 93 ---- .../metron/indexing/dao/AccessConfig.java | 16 +- .../apache/metron/indexing/dao/InMemoryDao.java | 31 +- .../indexing/dao/SearchIntegrationTest.java | 154 ++++++- 18 files changed, 1449 insertions(+), 457 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template index 7db006e..3a68d75 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template @@ -98,7 +98,7 @@ "mapping": { "type": "float" }, - "match": "threat.triage.rules:*:score", + "match": "threat:triage:*score", "match_mapping_type": "*" } }, @@ -107,7 +107,7 @@ "mapping": { "type": "string" }, - "match": "threat.triage.rules:*:reason", + "match": "threat:triage:rules:*:reason", "match_mapping_type": "*" } }, @@ -116,9 +116,9 @@ "mapping": { "type": "string" }, - "match": "threat.triage.rules:*:name", + "match": "threat:triage:rules:*:name", "match_mapping_type": "*" - } + } } ], "properties": { http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template index f13a9ee..7c6b401 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template @@ -98,28 +98,28 @@ "mapping": { "type": "float" }, - "match": "threat.triage.rules:*:score", + "match": "threat:triage:*score", "match_mapping_type": "*" } }, - { - "threat_triage_reason": { - "mapping": { - "type": "string" - }, - "match": "threat.triage.rules:*:reason", - "match_mapping_type": "*" - } - }, - { - "threat_triage_name": { - "mapping": { - "type": "string" - }, - "match": "threat.triage.rules:*:name", - "match_mapping_type": "*" - } + { + "threat_triage_reason": { + "mapping": { + "type": "string" + }, + "match": "threat:triage:rules:*:reason", + "match_mapping_type": "*" } + }, + { + "threat_triage_name": { + "mapping": { + "type": "string" + }, + "match": "threat:triage:rules:*:name", + "match_mapping_type": "*" + } + } ], "properties": { "timestamp": { @@ -195,9 +195,6 @@ "tcpwindow": { "type": "string" }, - "threat:triage:level": { - "type": "double" - }, "tos": { "type": "integer" }, http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template index d84235d..d100eb0 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template @@ -98,7 +98,7 @@ "mapping": { "type": "float" }, - "match": "threat.triage.rules:*:score", + "match": "threat:triage:*score", "match_mapping_type": "*" } }, @@ -107,7 +107,7 @@ "mapping": { "type": "string" }, - "match": "threat.triage.rules:*:reason", + "match": "threat:triage:rules:*:reason", "match_mapping_type": "*" } }, @@ -116,7 +116,7 @@ "mapping": { "type": "string" }, - "match": "threat.triage.rules:*:name", + "match": "threat:triage:rules:*:name", "match_mapping_type": "*" } } http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java index 4ce9644..25bb809 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java @@ -34,6 +34,10 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + @Configuration public class IndexConfig { @@ -57,6 +61,7 @@ public class IndexConfig { int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000); String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null); String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null); + AccessConfig config = new AccessConfig(); config.setMaxSearchResults(searchMaxResults); config.setMaxSearchGroups(searchMaxGroups); @@ -84,6 +89,7 @@ public class IndexConfig { MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0); ret.init(indexDao, Optional.ofNullable(metaDaoSort)); return ret; + } catch(RuntimeException re) { throw re; http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java index 3673654..78a1e20 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java @@ -89,8 +89,8 @@ public class SearchControllerIntegrationTest extends DaoControllerTest { public void setup() throws Exception { this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); ImmutableMap<String, String> testData = ImmutableMap.of( - "bro_index_2017.01.01.01", SearchIntegrationTest.broData, - "snort_index_2017.01.01.01", SearchIntegrationTest.snortData + "bro_index_2017.01.01.01", SearchIntegrationTest.broData, + "snort_index_2017.01.01.01", SearchIntegrationTest.snortData ); loadTestData(testData); loadColumnTypes(); @@ -114,19 +114,19 @@ public class SearchControllerIntegrationTest extends DaoControllerTest { }}); assertEventually(() -> this.mockMvc.perform(post(searchUrl + "/search").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(defaultQuery)) - .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.total").value(5)) - .andExpect(jsonPath("$.results[0].source.source:type").value("bro")) - .andExpect(jsonPath("$.results[0].source.timestamp").value(5)) - .andExpect(jsonPath("$.results[1].source.source:type").value("bro")) - .andExpect(jsonPath("$.results[1].source.timestamp").value(4)) - .andExpect(jsonPath("$.results[2].source.source:type").value("bro")) - .andExpect(jsonPath("$.results[2].source.timestamp").value(3)) - .andExpect(jsonPath("$.results[3].source.source:type").value("bro")) - .andExpect(jsonPath("$.results[3].source.timestamp").value(2)) - .andExpect(jsonPath("$.results[4].source.source:type").value("bro")) - .andExpect(jsonPath("$.results[4].source.timestamp").value(1)) + .andExpect(status().isOk()) + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(5)) + .andExpect(jsonPath("$.results[0].source.source:type").value("bro")) + .andExpect(jsonPath("$.results[0].source.timestamp").value(5)) + .andExpect(jsonPath("$.results[1].source.source:type").value("bro")) + .andExpect(jsonPath("$.results[1].source.timestamp").value(4)) + .andExpect(jsonPath("$.results[2].source.source:type").value("bro")) + .andExpect(jsonPath("$.results[2].source.timestamp").value(3)) + .andExpect(jsonPath("$.results[3].source.source:type").value("bro")) + .andExpect(jsonPath("$.results[3].source.timestamp").value(2)) + .andExpect(jsonPath("$.results[4].source.source:type").value("bro")) + .andExpect(jsonPath("$.results[4].source.timestamp").value(1)) ); sensorIndexingConfigService.delete("bro"); @@ -288,4 +288,4 @@ public class SearchControllerIntegrationTest extends DaoControllerTest { columnTypes.put("snort", snortTypes); InMemoryDao.setColumnMetadata(columnTypes); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java new file mode 100644 index 0000000..0393629 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.apache.metron.indexing.dao.search.FieldType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Responsible for retrieving column-level metadata about search indices. + */ +public interface ColumnMetadataDao { + + /** + * Retrieves column metadata for one or more search indices. + * @param indices The search indices to retrieve column metadata for. + * @return The column metadata, one set for each search index. + * @throws IOException + */ + Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException; + + /** + * Finds the latest version of a set of base indices. This can be used to find + * the latest 'bro' index, for example. + * + * Assuming the following indices exist... + * + * [ + * 'bro_index_2017.10.03.19' + * 'bro_index_2017.10.03.20', + * 'bro_index_2017.10.03.21', + * 'snort_index_2017.10.03.19', + * 'snort_index_2017.10.03.20', + * 'snort_index_2017.10.03.21' + * ] + * + * And the include indices are given as... + * + * ['bro', 'snort'] + * + * Then the latest indices are... + * + * ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21'] + * + * @param includeIndices The base names of the indices to include + * @return The latest version of a set of indices. + */ + String[] getLatestIndices(List<String> includeIndices); +} http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java new file mode 100644 index 0000000..8e210b4 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.apache.metron.indexing.dao.search.FieldType; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; + +/** + * Responsible for retrieving column-level metadata for Elasticsearch search indices. + */ +public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static Map<String, FieldType> elasticsearchTypeMap; + static { + Map<String, FieldType> fieldTypeMap = new HashMap<>(); + fieldTypeMap.put("string", FieldType.STRING); + fieldTypeMap.put("ip", FieldType.IP); + fieldTypeMap.put("integer", FieldType.INTEGER); + fieldTypeMap.put("long", FieldType.LONG); + fieldTypeMap.put("date", FieldType.DATE); + fieldTypeMap.put("float", FieldType.FLOAT); + fieldTypeMap.put("double", FieldType.DOUBLE); + fieldTypeMap.put("boolean", FieldType.BOOLEAN); + elasticsearchTypeMap = Collections.unmodifiableMap(fieldTypeMap); + } + + /** + * An Elasticsearch administrative client. + */ + private transient AdminClient adminClient; + + /** + * @param adminClient The Elasticsearch admin client. + */ + public ElasticsearchColumnMetadataDao(AdminClient adminClient) { + this.adminClient = adminClient; + } + + @SuppressWarnings("unchecked") + @Override + public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { + Map<String, FieldType> indexColumnMetadata = new HashMap<>(); + Map<String, String> previousIndices = new HashMap<>(); + Set<String> fieldBlackList = new HashSet<>(); + + String[] latestIndices = getLatestIndices(indices); + if (latestIndices.length > 0) { + ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = adminClient + .indices() + .getMappings(new GetMappingsRequest().indices(latestIndices)) + .actionGet() + .getMappings(); + + // for each index + for (Object key : mappings.keys().toArray()) { + String indexName = key.toString(); + ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName); + + // for each mapping in the index + Iterator<String> mappingIterator = mapping.keysIt(); + while (mappingIterator.hasNext()) { + MappingMetaData mappingMetaData = mapping.get(mappingIterator.next()); + Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData + .getSourceAsMap().get("properties"); + + // for each field in the mapping + for (String field : map.keySet()) { + if (!fieldBlackList.contains(field)) { + FieldType type = toFieldType(map.get(field).get("type")); + + if(!indexColumnMetadata.containsKey(field)) { + indexColumnMetadata.put(field, type); + + // record the last index in which a field exists, to be able to print helpful error message on type mismatch + previousIndices.put(field, indexName); + + } else { + FieldType previousType = indexColumnMetadata.get(field); + if (!type.equals(previousType)) { + String previousIndexName = previousIndices.get(field); + LOG.error(String.format( + "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.", + indexName, field, type.getFieldType(), + previousIndexName, field, previousType.getFieldType(), + FieldType.OTHER.getFieldType())); + indexColumnMetadata.put(field, FieldType.OTHER); + + // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER + fieldBlackList.add(field); + } + } + } + } + } + } + } else { + LOG.info(String.format("Unable to find any latest indices; indices=%s", indices)); + } + + return indexColumnMetadata; + + } + + /** + * Retrieves the latest indices. + * @param includeIndices + * @return + */ + @Override + public String[] getLatestIndices(List<String> includeIndices) { + LOG.debug("Getting latest indices; indices={}", includeIndices); + Map<String, String> latestIndices = new HashMap<>(); + String[] indices = adminClient + .indices() + .prepareGetIndex() + .setFeatures() + .get() + .getIndices(); + + for (String index : indices) { + int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER); + if (prefixEnd != -1) { + String prefix = index.substring(0, prefixEnd); + if (includeIndices.contains(prefix)) { + String latestIndex = latestIndices.get(prefix); + if (latestIndex == null || index.compareTo(latestIndex) > 0) { + latestIndices.put(prefix, index); + } + } + } + } + + return latestIndices.values().toArray(new String[latestIndices.size()]); + } + + /** + * Converts a string type to the corresponding FieldType. + * @param type The type to convert. + * @return The corresponding FieldType or FieldType.OTHER, if no match. + */ + private FieldType toFieldType(String type) { + return elasticsearchTypeMap.getOrDefault(type, FieldType.OTHER); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 87ad7f7..910c09b 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -17,26 +17,8 @@ */ package org.apache.metron.elasticsearch.dao; -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; - import com.google.common.base.Splitter; import com.google.common.collect.Iterables; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.function.Function; -import java.util.stream.Collectors; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; @@ -52,19 +34,16 @@ import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; import org.elasticsearch.action.ActionWriteResponse.ShardInfo; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.index.mapper.ip.IpFieldMapper; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; @@ -80,18 +59,64 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; + +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; public class ElasticsearchDao implements IndexDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_LAST = "_last"; + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_FIRST = "_first"; + + /** + * The Elasticsearch client. + */ private transient TransportClient client; + + /** + * Retrieves column metadata about search indices. + */ + private ColumnMetadataDao columnMetadataDao; + + /** + * Handles the submission of search requests to Elasticsearch. + */ + private ElasticsearchRequestSubmitter requestSubmitter; + private AccessConfig accessConfig; - protected ElasticsearchDao(TransportClient client, AccessConfig config) { + protected ElasticsearchDao(TransportClient client, + ColumnMetadataDao columnMetadataDao, + ElasticsearchRequestSubmitter requestSubmitter, + AccessConfig config) { this.client = client; + this.columnMetadataDao = columnMetadataDao; + this.requestSubmitter = requestSubmitter; this.accessConfig = config; } @@ -99,21 +124,6 @@ public class ElasticsearchDao implements IndexDao { //uninitialized. } - private static Map<String, FieldType> elasticsearchSearchTypeMap; - - static { - Map<String, FieldType> fieldTypeMap = new HashMap<>(); - fieldTypeMap.put("string", FieldType.STRING); - fieldTypeMap.put("ip", FieldType.IP); - fieldTypeMap.put("integer", FieldType.INTEGER); - fieldTypeMap.put("long", FieldType.LONG); - fieldTypeMap.put("date", FieldType.DATE); - fieldTypeMap.put("float", FieldType.FLOAT); - fieldTypeMap.put("double", FieldType.DOUBLE); - fieldTypeMap.put("boolean", FieldType.BOOLEAN); - elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap); - } - @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); @@ -121,56 +131,139 @@ public class ElasticsearchDao implements IndexDao { /** * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. - * @param searchRequest The request defining the parameters of the search + * @param request The request defining the parameters of the search * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping * @return The results of the query * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search */ - protected SearchResponse search(SearchRequest searchRequest, QueryBuilder queryBuilder) throws InvalidSearchException { + protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + if(client == null) { throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); } - if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) { + + if (request.getSize() > accessConfig.getMaxSearchResults()) { throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); } - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + + esRequest = buildSearchRequest(request, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + return buildSearchResponse(request, esResponse); + } + + /** + * Builds an Elasticsearch search request. + * @param searchRequest The Metron search request. + * @param queryBuilder + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildSearchRequest( + SearchRequest searchRequest, + QueryBuilder queryBuilder) throws InvalidSearchException { + + LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); + SearchSourceBuilder searchBuilder = new SearchSourceBuilder() .size(searchRequest.getSize()) .from(searchRequest.getFrom()) .query(queryBuilder) .trackScores(true); - searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder()))); - Optional<List<String>> fields = searchRequest.getFields(); - if (fields.isPresent()) { - searchSourceBuilder.fields(fields.get()); - } else { - searchSourceBuilder.fetchSource(true); + + // column metadata needed to understand the type of each sort field + Map<String, FieldType> meta; + try { + meta = getColumnMetadata(searchRequest.getIndices()); + } catch(IOException e) { + throw new InvalidSearchException("Unable to get column metadata", e); } - Optional<List<String>> facetFields = searchRequest.getFacetFields(); - if (facetFields.isPresent()) { - facetFields.get().forEach(field -> searchSourceBuilder.aggregation(new TermsBuilder(getFacentAggregationName(field)).field(field))); + + // handle sort fields + for(SortField sortField : searchRequest.getSort()) { + + // what type is the sort field? + FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); + + // sort order - if ascending missing values sorted last. otherwise, missing values sorted first + org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); + String missingSortOrder; + if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { + missingSortOrder = SORT_MISSING_LAST; + } else { + missingSortOrder = SORT_MISSING_FIRST; + } + + // sort by the field - missing fields always last + FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) + .order(sortOrder) + .missing(missingSortOrder) + .unmappedType(sortFieldType.getFieldType()); + searchBuilder.sort(sortBy); } - String[] wildcardIndices = wildcardIndices(searchRequest.getIndices()); - org.elasticsearch.action.search.SearchResponse elasticsearchResponse; - try { - elasticsearchResponse = client.search(new org.elasticsearch.action.search.SearchRequest(wildcardIndices) - .source(searchSourceBuilder)).actionGet(); - } catch (SearchPhaseExecutionException e) { - LOG.error("Could not execute search", e); - throw new InvalidSearchException("Could not execute search", e); + + // handle search fields + if (searchRequest.getFields().isPresent()) { + searchBuilder.fields(searchRequest.getFields().get()); + } else { + searchBuilder.fetchSource(true); } + + // handle facet fields + if (searchRequest.getFacetFields().isPresent()) { + for(String field : searchRequest.getFacetFields().get()) { + String name = getFacentAggregationName(field); + TermsBuilder terms = new TermsBuilder(name).field(field); + searchBuilder.aggregation(terms); + } + } + + // return the search request + String[] indices = wildcardIndices(searchRequest.getIndices()); + LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchBuilder); + } + + /** + * Builds a search response. + * + * This effectively transforms an Elasticsearch search response into a Metron search response. + * + * @param searchRequest The Metron search request. + * @param esResponse The Elasticsearch search response. + * @return A Metron search response. + * @throws InvalidSearchException + */ + private SearchResponse buildSearchResponse( + SearchRequest searchRequest, + org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { + SearchResponse searchResponse = new SearchResponse(); - searchResponse.setTotal(elasticsearchResponse.getHits().getTotalHits()); - searchResponse.setResults(Arrays.stream(elasticsearchResponse.getHits().getHits()).map(searchHit -> - getSearchResult(searchHit, fields.isPresent())).collect(Collectors.toList())); - if (facetFields.isPresent()) { + searchResponse.setTotal(esResponse.getHits().getTotalHits()); + + // search hits --> search results + List<SearchResult> results = new ArrayList<>(); + for(SearchHit hit: esResponse.getHits().getHits()) { + results.add(getSearchResult(hit, searchRequest.getFields().isPresent())); + } + searchResponse.setResults(results); + + // handle facet fields + if (searchRequest.getFacetFields().isPresent()) { + List<String> facetFields = searchRequest.getFacetFields().get(); Map<String, FieldType> commonColumnMetadata; try { commonColumnMetadata = getColumnMetadata(searchRequest.getIndices()); } catch (IOException e) { - throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(searchRequest.getIndices().toArray()))); + throw new InvalidSearchException(String.format( + "Could not get common column metadata for indices %s", + Arrays.toString(searchRequest.getIndices().toArray()))); } - searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), elasticsearchResponse.getAggregations(), commonColumnMetadata )); + searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); } + + LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); return searchResponse; } @@ -188,42 +281,76 @@ public class ElasticsearchDao implements IndexDao { */ protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + if (client == null) { throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); } if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { throw new InvalidSearchException("At least 1 group must be provided."); } - final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(queryBuilder); - searchSourceBuilder.aggregation(getGroupsTermBuilder(groupRequest, 0)); - String[] wildcardIndices = wildcardIndices(groupRequest.getIndices()); - org.elasticsearch.action.search.SearchRequest request; - org.elasticsearch.action.search.SearchResponse response; - try { - request = new org.elasticsearch.action.search.SearchRequest(wildcardIndices) - .source(searchSourceBuilder); - response = client.search(request).actionGet(); - } catch (SearchPhaseExecutionException e) { - throw new InvalidSearchException("Could not execute search", e); - } + esRequest = buildGroupRequest(groupRequest, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + GroupResponse response = buildGroupResponse(groupRequest, esResponse); + + return response; + } + + /** + * Builds a group search request. + * @param groupRequest The Metron group request. + * @param queryBuilder The search query. + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildGroupRequest( + GroupRequest groupRequest, + QueryBuilder queryBuilder) { + + // handle groups + TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0); + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(queryBuilder) + .aggregation(groups); + + // return the search request + String[] indices = wildcardIndices(groupRequest.getIndices()); + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchSourceBuilder); + } + + /** + * Build a group response. + * @param groupRequest The original group request. + * @param response The search response. + * @return A group response. + * @throws InvalidSearchException + */ + private GroupResponse buildGroupResponse( + GroupRequest groupRequest, + org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { + + // build the search response Map<String, FieldType> commonColumnMetadata; try { commonColumnMetadata = getColumnMetadata(groupRequest.getIndices()); } catch (IOException e) { - throw new InvalidSearchException(String - .format("Could not get common column metadata for indices %s", + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", Arrays.toString(groupRequest.getIndices().toArray()))); } + GroupResponse groupResponse = new GroupResponse(); groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); - groupResponse.setGroupResults( - getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); return groupResponse; } private String[] wildcardIndices(List<String> indices) { + if(indices == null) + return new String[] {}; + return indices .stream() .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) @@ -235,26 +362,43 @@ public class ElasticsearchDao implements IndexDao { if(this.client == null) { this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); this.accessConfig = config; + this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); + this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); + } + + if(columnMetadataDao == null) { + throw new IllegalArgumentException("No ColumnMetadataDao available"); + } + + if(requestSubmitter == null) { + throw new IllegalArgumentException("No ElasticsearchRequestSubmitter available"); } } @Override public Document getLatest(final String guid, final String sensorType) throws IOException { - Optional<Document> ret = searchByGuid( - guid - , sensorType - , hit -> { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); - try { - return Optional.of(new Document(doc, guid, sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - ); - return ret.orElse(null); + Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + return doc.orElse(null); + } + + private Optional<Document> toDocument(final String guid, SearchHit hit) { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = toSourceType(hit.getType()); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + /** + * Returns the source type based on a given doc type. + * @param docType The document type. + * @return The source type. + */ + private String toSourceType(String docType) { + return Iterables.getFirst(Splitter.on("_doc").split(docType), null); } @Override @@ -394,8 +538,7 @@ public class ElasticsearchDao implements IndexDao { String type = sensorType + "_doc"; Object ts = update.getTimestamp(); IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) - .source(update.getDocument()) - ; + .source(update.getDocument()); if(ts != null) { indexRequest = indexRequest.timestamp(ts.toString()); } @@ -403,77 +546,9 @@ public class ElasticsearchDao implements IndexDao { return indexRequest; } - @SuppressWarnings("unchecked") @Override public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { - Map<String, FieldType> indexColumnMetadata = new HashMap<>(); - - // Keep track of the last index used to inspect a field type so we can print a helpful error message on type mismatch - Map<String, String> previousIndices = new HashMap<>(); - // If we have detected a field type mismatch, ignore the field going forward since the type has been set to OTHER - Set<String> fieldBlackList = new HashSet<>(); - - String[] latestIndices = getLatestIndices(indices); - if (latestIndices.length > 0) { - ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings = client - .admin() - .indices() - .getMappings(new GetMappingsRequest().indices(latestIndices)) - .actionGet() - .getMappings(); - for (Object key : mappings.keys().toArray()) { - String indexName = key.toString(); - ImmutableOpenMap<String, MappingMetaData> mapping = mappings.get(indexName); - Iterator<String> mappingIterator = mapping.keysIt(); - while (mappingIterator.hasNext()) { - MappingMetaData mappingMetaData = mapping.get(mappingIterator.next()); - Map<String, Map<String, String>> map = (Map<String, Map<String, String>>) mappingMetaData - .getSourceAsMap().get("properties"); - for (String field : map.keySet()) { - if (!fieldBlackList.contains(field)) { - FieldType type = elasticsearchSearchTypeMap - .getOrDefault(map.get(field).get("type"), FieldType.OTHER); - if (indexColumnMetadata.containsKey(field)) { - FieldType previousType = indexColumnMetadata.get(field); - if (!type.equals(previousType)) { - String previousIndexName = previousIndices.get(field); - LOG.error(String.format( - "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.", - indexName, field, type.getFieldType(), - previousIndexName, field, previousType.getFieldType(), - FieldType.OTHER.getFieldType())); - indexColumnMetadata.put(field, FieldType.OTHER); - // Detected a type mismatch so ignore the field from now on - fieldBlackList.add(field); - } - } else { - indexColumnMetadata.put(field, type); - previousIndices.put(field, indexName); - } - } - } - } - } - } - return indexColumnMetadata; - } - - protected String[] getLatestIndices(List<String> includeIndices) { - Map<String, String> latestIndices = new HashMap<>(); - String[] indices = client.admin().indices().prepareGetIndex().setFeatures().get().getIndices(); - for (String index : indices) { - int prefixEnd = index.indexOf(INDEX_NAME_DELIMITER); - if (prefixEnd != -1) { - String prefix = index.substring(0, prefixEnd); - if (includeIndices.contains(prefix)) { - String latestIndex = latestIndices.get(prefix); - if (latestIndex == null || index.compareTo(latestIndex) > 0) { - latestIndices.put(prefix, index); - } - } - } - } - return latestIndices.values().toArray(new String[latestIndices.size()]); + return columnMetadataDao.getColumnMetadata(indices); } private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( @@ -588,4 +663,19 @@ public class ElasticsearchDao implements IndexDao { private String getSumAggregationName(String field) { return String.format("%s_score", field); } + + public ElasticsearchDao client(TransportClient client) { + this.client = client; + return this; + } + + public ElasticsearchDao columnMetadataDao(ColumnMetadataDao columnMetadataDao) { + this.columnMetadataDao = columnMetadataDao; + return this; + } + + public ElasticsearchDao accessConfig(AccessConfig accessConfig) { + this.accessConfig = accessConfig; + return this; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java new file mode 100644 index 0000000..0e0df21 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitter.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.rest.RestStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.invoke.MethodHandles; + +/** + * Responsible for submitting requests to Elasticsearch. + */ +public class ElasticsearchRequestSubmitter { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The Elasticsearch client. + */ + private TransportClient client; + + public ElasticsearchRequestSubmitter(TransportClient client) { + this.client = client; + } + + /** + * Submit a search to Elasticsearch. + * @param request A search request. + * @return The search response. + */ + public SearchResponse submitSearch(SearchRequest request) throws InvalidSearchException { + LOG.debug("About to submit a search; request={}", ElasticsearchUtils.toJSON(request).orElse("???")); + + // submit the search request + org.elasticsearch.action.search.SearchResponse esResponse; + try { + esResponse = client + .search(request) + .actionGet(); + LOG.debug("Got Elasticsearch response; response={}", esResponse.toString()); + + } catch (SearchPhaseExecutionException e) { + String msg = String.format( + "Failed to execute search; error='%s', search='%s'", + ExceptionUtils.getRootCauseMessage(e), + ElasticsearchUtils.toJSON(request).orElse("???")); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + + // check for shard failures + if(esResponse.getFailedShards() > 0) { + handleShardFailures(request, esResponse); + } + + // validate the response status + if(RestStatus.OK == esResponse.status()) { + return esResponse; + + } else { + // the search was not successful + String msg = String.format( + "Bad search response; status=%s, timeout=%s, terminatedEarly=%s", + esResponse.status(), esResponse.isTimedOut(), esResponse.isTerminatedEarly()); + LOG.error(msg); + throw new InvalidSearchException(msg); + } + } + + /** + * Handle individual shard failures that can occur even when the response is OK. These + * can indicate misconfiguration of the search indices. + * @param request The search request. + * @param response The search response. + */ + private void handleShardFailures( + org.elasticsearch.action.search.SearchRequest request, + org.elasticsearch.action.search.SearchResponse response) { + /* + * shard failures are only logged. the search itself is not failed. this approach + * assumes that a user is interested in partial search results, even if the + * entire search result set cannot be produced. + * + * for example, assume the user adds an additional sensor and the telemetry + * is indexed into a new search index. if that search index is misconfigured, + * it can result in partial shard failures. rather than failing the entire search, + * we log the error and allow the results to be returned from shards that + * are correctly configured. + */ + int errors = ArrayUtils.getLength(response.getShardFailures()); + LOG.error("Search resulted in {}/{} shards failing; errors={}, search={}", + response.getFailedShards(), + response.getTotalShards(), + errors, + ElasticsearchUtils.toJSON(request).orElse("???")); + + // log each reported failure + int failureCount=1; + for(ShardSearchFailure fail: response.getShardFailures()) { + String msg = String.format( + "Shard search failure [%s/%s]; reason=%s, index=%s, shard=%s, status=%s, nodeId=%s", + failureCount, + errors, + ExceptionUtils.getRootCauseMessage(fail.getCause()), + fail.index(), + fail.shardId(), + fail.status(), + fail.shard().getNodeId()); + LOG.error(msg, fail.getCause()); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 4c9933b..f29012a 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -22,10 +22,15 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import org.apache.commons.lang.StringUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.codehaus.jackson.map.ObjectMapper; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; +import org.elasticsearch.common.xcontent.XContentHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.net.InetAddress; import java.net.UnknownHostException; import java.text.SimpleDateFormat; @@ -35,11 +40,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import static java.lang.String.format; public class ElasticsearchUtils { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE = ThreadLocal.withInitial(() -> new HashMap<>()); @@ -179,4 +187,48 @@ public class ElasticsearchUtils { } throw new IllegalStateException("Unable to read the elasticsearch ips, expected es.ip to be either a list of strings, a string hostname or a host:port string"); } + + /** + * Converts an Elasticsearch SearchRequest to JSON. + * @param esRequest The search request. + * @return The JSON representation of the SearchRequest. + */ + public static Optional<String> toJSON(org.elasticsearch.action.search.SearchRequest esRequest) { + Optional<String> json = Optional.empty(); + + if(esRequest != null) { + try { + json = Optional.of(XContentHelper.convertToJson(esRequest.source(), true)); + + } catch (Throwable t) { + LOG.error("Failed to convert search request to JSON", t); + } + } + + return json; + } + + /** + * Convert a SearchRequest to JSON. + * @param request The search request. + * @return The JSON representation of the SearchRequest. + */ + public static Optional<String> toJSON(Object request) { + Optional<String> json = Optional.empty(); + + if(request != null) { + try { + json = Optional.of( + new ObjectMapper() + .writer() + .withDefaultPrettyPrinter() + .writeValueAsString(request)); + + } catch (Throwable t) { + LOG.error("Failed to convert request to JSON", t); + } + } + + return json; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java new file mode 100644 index 0000000..0a83ee0 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDaoTest.java @@ -0,0 +1,144 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder; +import org.elasticsearch.action.admin.indices.get.GetIndexResponse; +import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse; +import org.elasticsearch.client.AdminClient; +import org.elasticsearch.client.IndicesAdminClient; +import org.elasticsearch.cluster.metadata.MappingMetaData; +import org.elasticsearch.common.collect.ImmutableOpenMap; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertArrayEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests the ElasticsearchColumnMetadata class. + */ +public class ElasticsearchColumnMetadataDaoTest { + + /** + * @param indices The names of all indices that will exist. + * @return An object to test. + */ + public ElasticsearchColumnMetadataDao setup(String[] indices) { + return setup(indices, ImmutableOpenMap.of()); + } + + /** + * @param indices The names of all indices that will exist. + * @param mappings The index mappings. + * @return An object to test. + */ + public ElasticsearchColumnMetadataDao setup( + String[] indices, + ImmutableOpenMap<String, ImmutableOpenMap<String, MappingMetaData>> mappings) { + + AdminClient adminClient = mock(AdminClient.class); + IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class); + GetIndexRequestBuilder getIndexRequestBuilder = mock(GetIndexRequestBuilder.class); + GetIndexResponse getIndexResponse = mock(GetIndexResponse.class); + ActionFuture getMappingsActionFuture = mock(ActionFuture.class); + GetMappingsResponse getMappingsResponse = mock(GetMappingsResponse.class); + + // setup the mocks so that a set of indices are available to the DAO + when(adminClient.indices()).thenReturn(indicesAdminClient); + when(indicesAdminClient.prepareGetIndex()).thenReturn(getIndexRequestBuilder); + when(getIndexRequestBuilder.setFeatures()).thenReturn(getIndexRequestBuilder); + when(getIndexRequestBuilder.get()).thenReturn(getIndexResponse); + when(getIndexResponse.getIndices()).thenReturn(indices); + + // setup the mocks so that a set of mappings are available to the DAO + when(indicesAdminClient.getMappings(any())).thenReturn(getMappingsActionFuture); + when(getMappingsActionFuture.actionGet()).thenReturn(getMappingsResponse); + when(getMappingsResponse.getMappings()).thenReturn(mappings); + + return new ElasticsearchColumnMetadataDao(adminClient); + } + + @Test + public void testGetOneLatestIndex() { + + // setup + String[] existingIndices = new String[] { + "bro_index_2017.10.03.19", + "bro_index_2017.10.03.20", + "bro_index_2017.10.03.21", + "snort_index_2017.10.03.19", + "snort_index_2017.10.03.20", + "snort_index_2017.10.03.21" + }; + ElasticsearchColumnMetadataDao dao = setup(existingIndices); + + // get the latest indices + List<String> args = Collections.singletonList("bro"); + String[] actual = dao.getLatestIndices(args); + + // validation + String [] expected = new String[] { "bro_index_2017.10.03.21" }; + assertArrayEquals(expected, actual); + } + + @Test + public void testGetLatestIndices() { + // setup + String[] existingIndices = new String[] { + "bro_index_2017.10.03.19", + "bro_index_2017.10.03.20", + "bro_index_2017.10.03.21", + "snort_index_2017.10.03.19", + "snort_index_2017.10.03.19", + "snort_index_2017.10.03.21" + }; + ElasticsearchColumnMetadataDao dao = setup(existingIndices); + + // get the latest indices + List<String> args = Arrays.asList("bro", "snort"); + String[] actual = dao.getLatestIndices(args); + + // validation + String [] expected = new String[] { "bro_index_2017.10.03.21", "snort_index_2017.10.03.21" }; + assertArrayEquals(expected, actual); + } + + @Test + public void testLatestIndicesWhereNoneExist() { + + // setup - there are no existing indices + String[] existingIndices = new String[] {}; + ElasticsearchColumnMetadataDao dao = setup(existingIndices); + + // get the latest indices + List<String> args = Arrays.asList("bro", "snort"); + String[] actual = dao.getLatestIndices(args); + + // validation + String [] expected = new String[] {}; + assertArrayEquals(expected, actual); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index 7c33018..a6c0aa6 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -17,98 +17,209 @@ */ package org.apache.metron.elasticsearch.dao; -import org.apache.metron.elasticsearch.matcher.SearchRequestMatcher; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.search.*; -import org.elasticsearch.action.ActionFuture; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; -import org.junit.Assert; -import org.junit.Before; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; import org.junit.Test; -import org.mockito.Mock; +import org.mockito.ArgumentCaptor; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.argThat; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; public class ElasticsearchDaoTest { - private IndexDao searchService; + private ElasticsearchDao dao; + private ElasticsearchRequestSubmitter requestSubmitter; - @Mock - TransportClient client; + private void setup(RestStatus status, int maxSearchResults, Map<String, FieldType> metadata) throws Exception { - @Before - public void setUp() throws Exception { - client = mock(TransportClient.class); + // setup the mock search hits + SearchHit hit1 = mock(SearchHit.class); + when(hit1.getId()).thenReturn("id1"); + when(hit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }}); + when(hit1.getScore()).thenReturn(0.1f); + + SearchHit hit2 = mock(SearchHit.class); + when(hit2.getId()).thenReturn("id2"); + when(hit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }}); + when(hit2.getScore()).thenReturn(0.2f); + + // search hits + SearchHit[] hits = { hit1, hit2 }; + SearchHits searchHits = mock(SearchHits.class); + when(searchHits.getHits()).thenReturn(hits); + when(searchHits.getTotalHits()).thenReturn(Integer.toUnsignedLong(hits.length)); + + // search response which returns the search hits + org.elasticsearch.action.search.SearchResponse response = mock(org.elasticsearch.action.search.SearchResponse.class); + when(response.status()).thenReturn(status); + when(response.getHits()).thenReturn(searchHits); + + // provides column metadata + ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class); + when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata); + + // returns the search response + requestSubmitter = mock(ElasticsearchRequestSubmitter.class); + when(requestSubmitter.submitSearch(any())).thenReturn(response); + + TransportClient client = mock(TransportClient.class); + + // provides configuration AccessConfig config = mock(AccessConfig.class); - when(config.getMaxSearchResults()).thenReturn(50); - searchService = new ElasticsearchDao(client, config); + when(config.getMaxSearchResults()).thenReturn(maxSearchResults); + + dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, config); + } + + private void setup(RestStatus status, int maxSearchResults) throws Exception { + setup(status, maxSearchResults, new HashMap<>()); } @Test - public void searchShouldProperlyBuildSearchRequest() throws Exception { + public void searchShouldSortByGivenFields() throws Exception { - // setup the mock client - SearchHit searchHit1 = mock(SearchHit.class); - when(searchHit1.getId()).thenReturn("id1"); - when(searchHit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }}); - when(searchHit1.getScore()).thenReturn(0.1f); + // setup the column metadata + Map<String, FieldType> columnMetadata = new HashMap<>(); + columnMetadata.put("sortByStringDesc", FieldType.STRING); + columnMetadata.put("sortByIntAsc", FieldType.INTEGER); - SearchHit searchHit2 = mock(SearchHit.class); - when(searchHit2.getId()).thenReturn("id2"); - when(searchHit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }}); - when(searchHit2.getScore()).thenReturn(0.2f); + // setup the dao + setup(RestStatus.OK, 25, columnMetadata); - SearchHits searchHits = mock(SearchHits.class); - when(searchHits.getHits()).thenReturn(new SearchHit[]{searchHit1, searchHit2}); - when(searchHits.getTotalHits()).thenReturn(2L); + // "sort by" fields for the search request + SortField[] expectedSortFields = { + sortBy("sortByStringDesc", SortOrder.DESC), + sortBy("sortByIntAsc", SortOrder.ASC), + sortBy("sortByUndefinedDesc", SortOrder.DESC) + }; - org.elasticsearch.action.search.SearchResponse elasticsearchResponse = mock(org.elasticsearch.action.search.SearchResponse.class); - when(elasticsearchResponse.getHits()).thenReturn(searchHits); + // create a metron search request + final List<String> indices = Arrays.asList("bro", "snort"); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.setSize(2); + searchRequest.setIndices(indices); + searchRequest.setFrom(5); + searchRequest.setSort(Arrays.asList(expectedSortFields)); + searchRequest.setQuery("some query"); - ActionFuture actionFuture = mock(ActionFuture.class); - when(actionFuture.actionGet()).thenReturn(elasticsearchResponse); - when(client.search(any())).thenReturn(actionFuture); + // submit the metron search request + SearchResponse searchResponse = dao.search(searchRequest); + assertNotNull(searchResponse); + + // capture the elasticsearch search request that was created + ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class); + verify(requestSubmitter).submitSearch(argument.capture()); + org.elasticsearch.action.search.SearchRequest request = argument.getValue(); + + // transform the request to JSON for validation + JSONParser parser = new JSONParser(); + JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???")); + + // validate the sort fields + JSONArray sortFields = (JSONArray) json.get("sort"); + assertEquals(3, sortFields.size()); + + { + // sort by string descending + JSONObject aSortField = (JSONObject) sortFields.get(0); + JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc"); + assertEquals("desc", sortBy.get("order")); + assertEquals("_last", sortBy.get("missing")); + assertEquals("string", sortBy.get("unmapped_type")); + } + { + // sort by integer ascending + JSONObject aSortField = (JSONObject) sortFields.get(1); + JSONObject sortByIntAsc = (JSONObject) aSortField.get("sortByIntAsc"); + assertEquals("asc", sortByIntAsc.get("order")); + assertEquals("_first", sortByIntAsc.get("missing")); + assertEquals("integer", sortByIntAsc.get("unmapped_type")); + } + { + // sort by unknown type + JSONObject aSortField = (JSONObject) sortFields.get(2); + JSONObject sortByUndefinedDesc = (JSONObject) aSortField.get("sortByUndefinedDesc"); + assertEquals("desc", sortByUndefinedDesc.get("order")); + assertEquals("_last", sortByUndefinedDesc.get("missing")); + assertEquals("other", sortByUndefinedDesc.get("unmapped_type")); + } + } + + @Test + public void searchShouldWildcardIndices() throws Exception { + + // setup the dao + setup(RestStatus.OK, 25); // "sort by" fields for the search request - SortField[] sortFields = { - sortBy("sortField1", SortOrder.DESC), - sortBy("sortField2", SortOrder.ASC) + SortField[] expectedSortFields = { + sortBy("sortByStringDesc", SortOrder.DESC), + sortBy("sortByIntAsc", SortOrder.ASC), + sortBy("sortByUndefinedDesc", SortOrder.DESC) }; - // create a search request + // create a metron search request + final List<String> indices = Arrays.asList("bro", "snort"); SearchRequest searchRequest = new SearchRequest(); searchRequest.setSize(2); - searchRequest.setIndices(Arrays.asList("bro", "snort")); + searchRequest.setIndices(indices); searchRequest.setFrom(5); - searchRequest.setSort(Arrays.asList(sortFields)); + searchRequest.setSort(Arrays.asList(expectedSortFields)); searchRequest.setQuery("some query"); - // submit the search request - SearchResponse searchResponse = searchService.search(searchRequest); - - // validate - String[] expectedIndices = {"bro_index*", "snort_index*"}; - verify(client).search(argThat(new SearchRequestMatcher(expectedIndices, "some query", 2, 5, sortFields))); - assertEquals(2, searchResponse.getTotal()); - List<SearchResult> actualSearchResults = searchResponse.getResults(); - assertEquals(2, actualSearchResults.size()); - assertEquals("id1", actualSearchResults.get(0).getId()); - assertEquals("value1", actualSearchResults.get(0).getSource().get("field")); - assertEquals(0.1f, actualSearchResults.get(0).getScore(), 0.0f); - assertEquals("id2", actualSearchResults.get(1).getId()); - assertEquals("value2", actualSearchResults.get(1).getSource().get("field")); - assertEquals(0.2f, actualSearchResults.get(1).getScore(), 0.0f); - verifyNoMoreInteractions(client); + // submit the metron search request + SearchResponse searchResponse = dao.search(searchRequest); + assertNotNull(searchResponse); + + // capture the elasticsearch search request that was created + ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class); + verify(requestSubmitter).submitSearch(argument.capture()); + org.elasticsearch.action.search.SearchRequest request = argument.getValue(); + + // transform the request to JSON for validation + JSONParser parser = new JSONParser(); + JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???")); + + // ensure that the index names are 'wildcard-ed' + String[] expected = { "bro_index*", "snort_index*" }; + assertArrayEquals(expected, request.indices()); + } + + + @Test(expected = InvalidSearchException.class) + public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception { + + int maxSearchResults = 20; + setup(RestStatus.OK, maxSearchResults); + + SearchRequest searchRequest = new SearchRequest(); + searchRequest.setSize(maxSearchResults+1); + + dao.search(searchRequest); + // exception expected - size > max } private SortField sortBy(String field, SortOrder order) { @@ -118,19 +229,4 @@ public class ElasticsearchDaoTest { return sortField; } - @Test - public void searchShouldThrowExceptionWhenMaxResultsAreExceeded() throws Exception { - SearchRequest searchRequest = new SearchRequest(); - searchRequest.setSize(51); - try { - searchService.search(searchRequest); - Assert.fail("Did not throw expected exception"); - } - catch(InvalidSearchException ise) { - Assert.assertEquals("Search result size must be less than 50", ise.getMessage()); - } - } - - - } http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java new file mode 100644 index 0000000..26f5fff --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.search.SearchShardTarget; +import org.junit.Test; +import org.mockito.Mockito; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ElasticsearchRequestSubmitterTest { + + private ElasticsearchRequestSubmitter submitter; + + public ElasticsearchRequestSubmitter setup(SearchResponse response) { + + // mocks + TransportClient client = mock(TransportClient.class); + ActionFuture future = Mockito.mock(ActionFuture.class); + + // the client should return the given search response + when(client.search(any())).thenReturn(future); + when(future.actionGet()).thenReturn(response); + + return new ElasticsearchRequestSubmitter(client); + } + + @Test + public void searchShouldSucceedWhenOK() throws InvalidSearchException { + + // mocks + SearchResponse response = mock(SearchResponse.class); + SearchRequest request = mock(SearchRequest.class); + + // response will have status of OK and no failed shards + when(response.status()).thenReturn(RestStatus.OK); + when(response.getFailedShards()).thenReturn(0); + when(response.getTotalShards()).thenReturn(2); + + // search should succeed + ElasticsearchRequestSubmitter submitter = setup(response); + SearchResponse actual = submitter.submitSearch(request); + assertNotNull(actual); + } + + @Test(expected = InvalidSearchException.class) + public void searchShouldFailWhenNotOK() throws InvalidSearchException { + + // mocks + SearchResponse response = mock(SearchResponse.class); + SearchRequest request = mock(SearchRequest.class); + + // response will have status of OK + when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT); + when(response.getFailedShards()).thenReturn(0); + when(response.getTotalShards()).thenReturn(2); + + // search should succeed + ElasticsearchRequestSubmitter submitter = setup(response); + submitter.submitSearch(request); + } + + @Test + public void searchShouldHandleShardFailure() throws InvalidSearchException { + // mocks + SearchResponse response = mock(SearchResponse.class); + SearchRequest request = mock(SearchRequest.class); + ShardSearchFailure fail = mock(ShardSearchFailure.class); + SearchShardTarget target = mock(SearchShardTarget.class); + + // response will have status of OK + when(response.status()).thenReturn(RestStatus.OK); + + // the response will report shard failures + when(response.getFailedShards()).thenReturn(1); + when(response.getTotalShards()).thenReturn(2); + + // the response will return the failures + ShardSearchFailure[] failures = { fail }; + when(response.getShardFailures()).thenReturn(failures); + + // shard failure needs to report the node + when(fail.shard()).thenReturn(target); + when(target.getNodeId()).thenReturn("node1"); + + // shard failure needs to report details of failure + when(fail.index()).thenReturn("bro_index_2017-10-11"); + when(fail.shardId()).thenReturn(1); + + // search should succeed, even with failed shards + ElasticsearchRequestSubmitter submitter = setup(response); + SearchResponse actual = submitter.submitSearch(request); + assertNotNull(actual); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/4a089900/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index 07cc708..3d50e99 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -17,18 +17,11 @@ */ package org.apache.metron.elasticsearch.integration; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.concurrent.ExecutionException; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; -import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.integration.InMemoryComponent; import org.elasticsearch.action.bulk.BulkRequestBuilder; @@ -43,7 +36,13 @@ import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; + public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { + private static String indexDir = "target/elasticsearch_search"; private static String dateFormat = "yyyy.MM.dd.HH"; private static final int MAX_RETRIES = 10; @@ -53,19 +52,46 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { * { * "bro_doc": { * "properties": { - * "source:type": { "type": "string" }, - * "ip_src_addr": { "type": "ip" }, - * "ip_src_port": { "type": "integer" }, - * "long_field": { "type": "long" }, - * "timestamp" : { "type": "date" }, - * "latitude" : { "type": "float" }, - * "score": { "type": "double" }, - * "is_alert": { "type": "boolean" }, - * "location_point": { "type": "geo_point" }, - * "bro_field": { "type": "string" }, - * "duplicate_name_field": { "type": "string" } + * "source:type": { + * "type": "string", + * "index": "not_analyzed" + * }, + * "ip_src_addr": { + * "type": "ip" + * }, + * "ip_src_port": { + * "type": "integer" + * }, + * "long_field": { + * "type": "long" + * }, + * "timestamp": { + * "type": "date", + * "format": "epoch_millis" + * }, + * "latitude" : { + * "type": "float" + * }, + * "score": { + * "type": "double" + * }, + * "is_alert": { + * "type": "boolean" + * }, + * "location_point": { + * "type": "geo_point" + * }, + * "bro_field": { + * "type": "string" + * }, + * "duplicate_name_field": { + * "type": "string" + * }, + * "alert": { + * "type": "nested" + * } * } - * } + * } * } */ @Multiline @@ -73,21 +99,51 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { /** * { - * "snort_doc": { - * "properties": { - * "source:type": { "type": "string" }, - * "ip_src_addr": { "type": "ip" }, - * "ip_src_port": { "type": "integer" }, - * "long_field": { "type": "long" }, - * "timestamp" : { "type": "date" }, - * "latitude" : { "type": "float" }, - * "score": { "type": "double" }, - * "is_alert": { "type": "boolean" }, - * "location_point": { "type": "geo_point" }, - * "snort_field": { "type": "integer" }, - * "duplicate_name_field": { "type": "integer" } - * } - * } + * "snort_doc": { + * "properties": { + * "source:type": { + * "type": "string", + * "index": "not_analyzed" + * }, + * "ip_src_addr": { + * "type": "ip" + * }, + * "ip_src_port": { + * "type": "integer" + * }, + * "long_field": { + * "type": "long" + * }, + * "timestamp": { + * "type": "date", + * "format": "epoch_millis" + * }, + * "latitude" : { + * "type": "float" + * }, + * "score": { + * "type": "double" + * }, + * "is_alert": { + * "type": "boolean" + * }, + * "location_point": { + * "type": "geo_point" + * }, + * "snort_field": { + * "type": "integer" + * }, + * "duplicate_name_field": { + * "type": "integer" + * }, + * "alert": { + * "type": "nested" + * }, + * "threat:triage:score": { + * "type": "float" + * } + * } + * } * } */ @Multiline @@ -106,27 +162,23 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { @Multiline private static String metaAlertTypeMappings; - @Override protected IndexDao createDao() throws Exception { - IndexDao elasticsearchDao = new ElasticsearchDao(); - elasticsearchDao.init( - new AccessConfig() {{ - setMaxSearchResults(100); - setMaxSearchGroups(100); - setGlobalConfigSupplier( () -> - new HashMap<String, Object>() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); - }} - ); + AccessConfig config = new AccessConfig(); + config.setMaxSearchResults(100); + config.setMaxSearchGroups(100); + config.setGlobalConfigSupplier( () -> + new HashMap<String, Object>() {{ + put("es.clustername", "metron"); + put("es.port", "9300"); + put("es.ip", "localhost"); + put("es.date.format", dateFormat); }} ); - MetaAlertDao ret = new ElasticsearchMetaAlertDao(); - ret.init(elasticsearchDao); - return elasticsearchDao; + + IndexDao dao = new ElasticsearchDao(); + dao.init(config); + return dao; } @Override
