METRON-1419: Create a SolrDao this closes apache/incubator-metron#911
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/23113a63 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/23113a63 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/23113a63 Branch: refs/heads/feature/METRON-1416-upgrade-solr Commit: 23113a6337a3fc4d0bfbb708303b30bf8122f23f Parents: 644e951 Author: merrimanr <merrim...@gmail.com> Authored: Thu Feb 1 16:13:46 2018 -0500 Committer: cstella <ceste...@gmail.com> Committed: Thu Feb 1 16:13:46 2018 -0500 ---------------------------------------------------------------------- dependencies_with_url.csv | 3 + .../elasticsearch/dao/ColumnMetadataDao.java | 67 - .../dao/ElasticsearchColumnMetadataDao.java | 31 +- .../elasticsearch/dao/ElasticsearchDao.java | 651 +------ .../dao/ElasticsearchSearchDao.java | 565 ++++++ .../dao/ElasticsearchUpdateDao.java | 130 ++ .../elasticsearch/dao/ElasticsearchDaoTest.java | 8 +- .../ElasticsearchSearchIntegrationTest.java | 112 ++ .../ElasticsearchUpdateIntegrationTest.java | 219 +-- .../metron/indexing/dao/ColumnMetadataDao.java | 39 + .../metron/indexing/dao/search/SearchDao.java | 34 + .../metron/indexing/dao/update/UpdateDao.java | 30 + .../indexing/dao/SearchIntegrationTest.java | 215 +-- .../indexing/dao/UpdateIntegrationTest.java | 199 +++ metron-platform/metron-solr/pom.xml | 37 +- .../metron/solr/dao/SolrColumnMetadataDao.java | 120 ++ .../org/apache/metron/solr/dao/SolrDao.java | 117 ++ .../apache/metron/solr/dao/SolrSearchDao.java | 310 ++++ .../apache/metron/solr/dao/SolrUpdateDao.java | 100 ++ .../SolrIndexingIntegrationTest.java | 10 +- .../integration/SolrSearchIntegrationTest.java | 153 ++ .../integration/SolrUpdateIntegrationTest.java | 73 + .../integration/components/SolrComponent.java | 22 +- .../resources/config/bro/conf/managed-schema | 50 + .../resources/config/bro/conf/solrconfig.xml | 1601 ++++++++++++++++++ .../resources/config/snort/conf/managed-schema | 51 + .../resources/config/snort/conf/solrconfig.xml | 1601 ++++++++++++++++++ .../resources/config/test/conf/managed-schema | 68 + .../resources/config/test/conf/solrconfig.xml | 1601 ++++++++++++++++++ pom.xml | 2 +- 30 files changed, 7251 insertions(+), 968 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/dependencies_with_url.csv ---------------------------------------------------------------------- diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv index a1f431b..2bf1c76 100644 --- a/dependencies_with_url.csv +++ b/dependencies_with_url.csv @@ -21,6 +21,7 @@ com.esotericsoftware:reflectasm:jar:1.10.1:compile,New BSD License,http://code.g com.flipkart.zjsonpatch:zjsonpatch:jar:0.3.4:compile,Apache v2, https://github.com/flipkart-incubator/zjsonpatch com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD license,http://code.google.com/p/protobuf com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD license,http://code.google.com/p/protobuf +com.google.protobuf:protobuf-java:jar:3.1.0:compile,New BSD license,http://code.google.com/p/protobuf com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/ com.maxmind.db:maxmind-db:jar:1.2.1:compile,CC-BY-SA 3.0,https://github.com/maxmind/MaxMind-DB com.maxmind.geoip2:geoip2:jar:2.8.0:compile,Apache v2,https://github.com/maxmind/GeoIP2-java @@ -78,6 +79,7 @@ org.jvnet.jaxb2_commons:jaxb2-basics-runtime:jar:0.6.5:compile,BSD,https://githu org.krakenapps:kraken-api:jar:2.1.1:compile, Apache v2, org.krakenapps:kraken-pcap:jar:1.7.1:compile, Apache v2, org.ow2.asm:asm:jar:4.0:compile,BSD,http://asm.ow2.org/ +org.ow2.asm:asm:jar:5.1:compile,BSD,http://asm.ow2.org/ org.slf4j:slf4j-api:jar:1.6.1:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-api:jar:1.7.10:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-api:jar:1.7.5:compile,MIT,http://www.slf4j.org @@ -91,6 +93,7 @@ org.slf4j:slf4j-log4j12:jar:1.7.21:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-log4j12:jar:1.7.5:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-log4j12:jar:1.7.7:compile,MIT,http://www.slf4j.org org.slf4j:slf4j-simple:jar:1.7.7:compile,MIT,http://www.slf4j.org +org.slf4j:jcl-over-slf4j:jar:1.7.7:compile,MIT,http://www.slf4j.org org.slf4j:jcl-over-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org org.slf4j:jul-to-slf4j:jar:1.7.21:compile,MIT,http://www.slf4j.org aopalliance:aopalliance:jar:1.0:compile,Public Domain,http://aopalliance.sourceforge.net http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java deleted file mode 100644 index 0393629..0000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ColumnMetadataDao.java +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.elasticsearch.dao; - -import org.apache.metron.indexing.dao.search.FieldType; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Responsible for retrieving column-level metadata about search indices. - */ -public interface ColumnMetadataDao { - - /** - * Retrieves column metadata for one or more search indices. - * @param indices The search indices to retrieve column metadata for. - * @return The column metadata, one set for each search index. - * @throws IOException - */ - Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException; - - /** - * Finds the latest version of a set of base indices. This can be used to find - * the latest 'bro' index, for example. - * - * Assuming the following indices exist... - * - * [ - * 'bro_index_2017.10.03.19' - * 'bro_index_2017.10.03.20', - * 'bro_index_2017.10.03.21', - * 'snort_index_2017.10.03.19', - * 'snort_index_2017.10.03.20', - * 'snort_index_2017.10.03.21' - * ] - * - * And the include indices are given as... - * - * ['bro', 'snort'] - * - * Then the latest indices are... - * - * ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21'] - * - * @param includeIndices The base names of the indices to include - * @return The latest version of a set of indices. - */ - String[] getLatestIndices(List<String> includeIndices); -} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java index c12802e..6a8cad8 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java @@ -18,6 +18,7 @@ package org.apache.metron.elasticsearch.dao; +import org.apache.metron.indexing.dao.ColumnMetadataDao; import org.apache.metron.indexing.dao.search.FieldType; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.client.AdminClient; @@ -140,12 +141,32 @@ public class ElasticsearchColumnMetadataDao implements ColumnMetadataDao { } /** - * Retrieves the latest indices. - * @param includeIndices - * @return + * Finds the latest version of a set of base indices. This can be used to find + * the latest 'bro' index, for example. + * + * Assuming the following indices exist... + * + * [ + * 'bro_index_2017.10.03.19' + * 'bro_index_2017.10.03.20', + * 'bro_index_2017.10.03.21', + * 'snort_index_2017.10.03.19', + * 'snort_index_2017.10.03.20', + * 'snort_index_2017.10.03.21' + * ] + * + * And the include indices are given as... + * + * ['bro', 'snort'] + * + * Then the latest indices are... + * + * ['bro_index_2017.10.03.21', 'snort_index_2017.10.03.21'] + * + * @param includeIndices The base names of the indices to include + * @return The latest version of a set of indices. */ - @Override - public String[] getLatestIndices(List<String> includeIndices) { + String[] getLatestIndices(List<String> includeIndices) { LOG.debug("Getting latest indices; indices={}", includeIndices); Map<String, String> latestIndices = new HashMap<>(); String[] indices = adminClient http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 650462e..7d6a9e5 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -17,96 +17,39 @@ */ package org.apache.metron.elasticsearch.dao; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.Optional; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; -import org.apache.metron.indexing.dao.search.Group; -import org.apache.metron.indexing.dao.search.GroupOrder; -import org.apache.metron.indexing.dao.search.GroupOrderType; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; -import org.apache.metron.indexing.dao.search.GroupResult; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.apache.metron.indexing.dao.search.SearchResult; -import org.apache.metron.indexing.dao.search.SortField; -import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; -import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.index.IndexResponse; -import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.elasticsearch.client.transport.TransportClient; -import org.elasticsearch.cluster.metadata.MappingMetaData; -import org.elasticsearch.common.collect.ImmutableOpenMap; -import org.elasticsearch.index.mapper.LegacyIpFieldMapper; -import org.elasticsearch.index.query.IdsQueryBuilder; import org.elasticsearch.index.query.QueryBuilder; -import org.elasticsearch.index.query.QueryBuilders; -import org.elasticsearch.index.query.QueryStringQueryBuilder; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; -import org.elasticsearch.search.aggregations.Aggregation; -import org.elasticsearch.search.aggregations.AggregationBuilders; -import org.elasticsearch.search.aggregations.Aggregations; -import org.elasticsearch.search.aggregations.bucket.terms.Terms; -import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; -import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; -import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; -import org.elasticsearch.search.aggregations.metrics.sum.Sum; -import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; -import org.elasticsearch.search.builder.SearchSourceBuilder; -import org.elasticsearch.search.sort.FieldSortBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.lang.invoke.MethodHandles; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; - -import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; - public class ElasticsearchDao implements IndexDao { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - /** - * The value required to ensure that Elasticsearch sorts missing values last. - */ - private static final String SORT_MISSING_LAST = "_last"; - - /** - * The value required to ensure that Elasticsearch sorts missing values last. - */ - private static final String SORT_MISSING_FIRST = "_first"; - - /** - * The Elasticsearch client. - */ private transient TransportClient client; + private ElasticsearchSearchDao searchDao; + private ElasticsearchUpdateDao updateDao; /** * Retrieves column metadata about search indices. */ - private ColumnMetadataDao columnMetadataDao; + private ElasticsearchColumnMetadataDao columnMetadataDao; /** * Handles the submission of search requests to Elasticsearch. @@ -116,10 +59,15 @@ public class ElasticsearchDao implements IndexDao { private AccessConfig accessConfig; protected ElasticsearchDao(TransportClient client, - ColumnMetadataDao columnMetadataDao, - ElasticsearchRequestSubmitter requestSubmitter, - AccessConfig config) { + AccessConfig config, + ElasticsearchSearchDao searchDao, + ElasticsearchUpdateDao updateDao, + ElasticsearchColumnMetadataDao columnMetadataDao, + ElasticsearchRequestSubmitter requestSubmitter + ) { this.client = client; + this.searchDao = searchDao; + this.updateDao = updateDao; this.columnMetadataDao = columnMetadataDao; this.requestSubmitter = requestSubmitter; this.accessConfig = config; @@ -129,266 +77,14 @@ public class ElasticsearchDao implements IndexDao { //uninitialized. } - private static Map<String, FieldType> elasticsearchSearchTypeMap; - - static { - Map<String, FieldType> fieldTypeMap = new HashMap<>(); - fieldTypeMap.put("text", FieldType.TEXT); - fieldTypeMap.put("keyword", FieldType.KEYWORD); - fieldTypeMap.put("ip", FieldType.IP); - fieldTypeMap.put("integer", FieldType.INTEGER); - fieldTypeMap.put("long", FieldType.LONG); - fieldTypeMap.put("date", FieldType.DATE); - fieldTypeMap.put("float", FieldType.FLOAT); - fieldTypeMap.put("double", FieldType.DOUBLE); - fieldTypeMap.put("boolean", FieldType.BOOLEAN); - elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap); - } - - @Override - public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { - if(searchRequest.getQuery() == null) { - throw new InvalidSearchException("Search query is invalid: null"); - } - return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); - } - - /** - * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. - * @param request The request defining the parameters of the search - * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping - * @return The results of the query - * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search - */ - protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { - org.elasticsearch.action.search.SearchRequest esRequest; - org.elasticsearch.action.search.SearchResponse esResponse; - - if(client == null) { - throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); - } - - if (request.getSize() > accessConfig.getMaxSearchResults()) { - throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); - } - - esRequest = buildSearchRequest(request, queryBuilder); - esResponse = requestSubmitter.submitSearch(esRequest); - return buildSearchResponse(request, esResponse); - } - - /** - * Builds an Elasticsearch search request. - * @param searchRequest The Metron search request. - * @param queryBuilder - * @return An Elasticsearch search request. - */ - private org.elasticsearch.action.search.SearchRequest buildSearchRequest( - SearchRequest searchRequest, - QueryBuilder queryBuilder) throws InvalidSearchException { - if (LOG.isDebugEnabled()) { - LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); - } - SearchSourceBuilder searchBuilder = new SearchSourceBuilder() - .size(searchRequest.getSize()) - .from(searchRequest.getFrom()) - .query(queryBuilder) - .trackScores(true); - Optional<List<String>> fields = searchRequest.getFields(); - // column metadata needed to understand the type of each sort field - Map<String, FieldType> meta; - try { - meta = getColumnMetadata(searchRequest.getIndices()); - } catch(IOException e) { - throw new InvalidSearchException("Unable to get column metadata", e); - } - - // handle sort fields - for(SortField sortField : searchRequest.getSort()) { - - // what type is the sort field? - FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); - - // sort order - if ascending missing values sorted last. otherwise, missing values sorted first - org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); - String missingSortOrder; - if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { - missingSortOrder = SORT_MISSING_LAST; - } else { - missingSortOrder = SORT_MISSING_FIRST; - } - - // sort by the field - missing fields always last - FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) - .order(sortOrder) - .missing(missingSortOrder) - .unmappedType(sortFieldType.getFieldType()); - searchBuilder.sort(sortBy); - } - - // handle search fields - if (fields.isPresent()) { - searchBuilder.fetchSource("*", null); - } else { - searchBuilder.fetchSource(true); - } - - Optional<List<String>> facetFields = searchRequest.getFacetFields(); - - // handle facet fields - if (searchRequest.getFacetFields().isPresent()) { - // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html - for(String field : searchRequest.getFacetFields().get()) { - String name = getFacetAggregationName(field); - TermsAggregationBuilder terms = AggregationBuilders.terms( name).field(field); - // new TermsBuilder(name).field(field); - searchBuilder.aggregation(terms); - } - } - - // return the search request - String[] indices = wildcardIndices(searchRequest.getIndices()); - if (LOG.isDebugEnabled()) { - LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); - } - return new org.elasticsearch.action.search.SearchRequest() - .indices(indices) - .source(searchBuilder); - } - - /** - * Builds a search response. - * - * This effectively transforms an Elasticsearch search response into a Metron search response. - * - * @param searchRequest The Metron search request. - * @param esResponse The Elasticsearch search response. - * @return A Metron search response. - * @throws InvalidSearchException - */ - private SearchResponse buildSearchResponse( - SearchRequest searchRequest, - org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { - - SearchResponse searchResponse = new SearchResponse(); - - searchResponse.setTotal(esResponse.getHits().getTotalHits()); - - // search hits --> search results - List<SearchResult> results = new ArrayList<>(); - for(SearchHit hit: esResponse.getHits().getHits()) { - results.add(getSearchResult(hit, searchRequest.getFields())); - } - searchResponse.setResults(results); - - // handle facet fields - if (searchRequest.getFacetFields().isPresent()) { - List<String> facetFields = searchRequest.getFacetFields().get(); - Map<String, FieldType> commonColumnMetadata; - try { - commonColumnMetadata = getColumnMetadata(searchRequest.getIndices()); - } catch (IOException e) { - throw new InvalidSearchException(String.format( - "Could not get common column metadata for indices %s", - Arrays.toString(searchRequest.getIndices().toArray()))); - } - searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); - } - return searchResponse; - } - - @Override - public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { - return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); - } - - /** - * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. - * @param groupRequest The request defining the parameters of the grouping - * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping - * @return The results of the query - * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search - */ - protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) - throws InvalidSearchException { - org.elasticsearch.action.search.SearchRequest esRequest; - org.elasticsearch.action.search.SearchResponse esResponse; - - if (client == null) { - throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); - } - if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { - throw new InvalidSearchException("At least 1 group must be provided."); - } - - esRequest = buildGroupRequest(groupRequest, queryBuilder); - esResponse = requestSubmitter.submitSearch(esRequest); - GroupResponse response = buildGroupResponse(groupRequest, esResponse); - - return response; - } - - /** - * Builds a group search request. - * @param groupRequest The Metron group request. - * @param queryBuilder The search query. - * @return An Elasticsearch search request. - */ - private org.elasticsearch.action.search.SearchRequest buildGroupRequest( - GroupRequest groupRequest, - QueryBuilder queryBuilder) { - - // handle groups - TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0); - final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() - .query(queryBuilder) - .aggregation(groups); - - // return the search request - String[] indices = wildcardIndices(groupRequest.getIndices()); - return new org.elasticsearch.action.search.SearchRequest() - .indices(indices) - .source(searchSourceBuilder); - } - - /** - * Build a group response. - * @param groupRequest The original group request. - * @param response The search response. - * @return A group response. - * @throws InvalidSearchException - */ - private GroupResponse buildGroupResponse( - GroupRequest groupRequest, - org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { - - // build the search response - Map<String, FieldType> commonColumnMetadata; - try { - commonColumnMetadata = getColumnMetadata(groupRequest.getIndices()); - } catch (IOException e) { - throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", - Arrays.toString(groupRequest.getIndices().toArray()))); - } - - GroupResponse groupResponse = new GroupResponse(); - groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); - groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); - return groupResponse; + public ElasticsearchDao columnMetadataDao(ElasticsearchColumnMetadataDao columnMetadataDao) { + this.columnMetadataDao = columnMetadataDao; + return this; } - private String[] wildcardIndices(List<String> indices) { - if(indices == null) - return new String[] {}; - - return indices - .stream() - .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) - .toArray(value -> new String[indices.size()]); + public ElasticsearchDao accessConfig(AccessConfig accessConfig) { + this.accessConfig = accessConfig; + return this; } @Override @@ -398,6 +94,8 @@ public class ElasticsearchDao implements IndexDao { this.accessConfig = config; this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin()); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); + this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter); + this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, searchDao); } if(columnMetadataDao == null) { @@ -410,317 +108,54 @@ public class ElasticsearchDao implements IndexDao { } @Override - public Document getLatest(final String guid, final String sensorType) throws IOException { - Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); - return doc.orElse(null); + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + return this.searchDao.search(searchRequest); } - private Optional<Document> toDocument(final String guid, SearchHit hit) { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = toSourceType(hit.getType()); - try { - return Optional.of(new Document(doc, guid, sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return this.searchDao.group(groupRequest); } - /** - * Returns the source type based on a given doc type. - * @param docType The document type. - * @return The source type. - */ - private String toSourceType(String docType) { - return Iterables.getFirst(Splitter.on("_doc").split(docType), null); + @Override + public Document getLatest(final String guid, final String sensorType) throws IOException { + return searchDao.getLatest(guid, sensorType); } @Override public Iterable<Document> getAllLatest( final List<GetRequest> getRequests) throws IOException { - Collection<String> guids = new HashSet<>(); - Collection<String> sensorTypes = new HashSet<>(); - for (GetRequest getRequest: getRequests) { - guids.add(getRequest.getGuid()); - sensorTypes.add(getRequest.getSensorType()); - } - List<Document> documents = searchByGuids( - guids - , sensorTypes - , hit -> { - Long ts = 0L; - String doc = hit.getSourceAsString(); - String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); - try { - return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); - } catch (IOException e) { - throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); - } - } - - ); - return documents; - } - - <T> Optional<T> searchByGuid(String guid, String sensorType, - Function<SearchHit, Optional<T>> callback) { - Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; - List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); - if (results.size() > 0) { - return Optional.of(results.get(0)); - } else { - return Optional.empty(); - } - } - - /** - * Return the search hit based on the UUID and sensor type. - * A callback can be specified to transform the hit into a type T. - * If more than one hit happens, the first one will be returned. - */ - <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, - Function<SearchHit, Optional<T>> callback) { - if(guids == null || guids.isEmpty()) { - return Collections.EMPTY_LIST; - } - QueryBuilder query = null; - IdsQueryBuilder idsQuery = null; - if (sensorTypes != null) { - String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); - idsQuery = QueryBuilders.idsQuery(types); - } else { - idsQuery = QueryBuilders.idsQuery(); - } - - for(String guid : guids) { - query = idsQuery.addIds(guid); - } - - SearchRequestBuilder request = client.prepareSearch() - .setQuery(query) - .setSize(guids.size()) - ; - org.elasticsearch.action.search.SearchResponse response = request.get(); - SearchHits hits = response.getHits(); - List<T> results = new ArrayList<>(); - for (SearchHit hit : hits) { - Optional<T> result = callback.apply(hit); - if (result.isPresent()) { - results.add(result.get()); - } - } - return results; + return searchDao.getAllLatest(getRequests); } @Override public void update(Document update, Optional<String> index) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, index, indexPostfix); - - IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); - try { - IndexResponse response = client.index(indexRequest).get(); - - ShardInfo shardInfo = response.getShardInfo(); - int failed = shardInfo.getFailed(); - if (failed > 0) { - throw new IOException( - "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); - } - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } + updateDao.update(update, index); } @Override public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { - String indexPostfix = ElasticsearchUtils - .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); - - BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); - - // Get the indices we'll actually be using for each Document. - for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { - Document update = updateEntry.getKey(); - String sensorType = update.getSensorType(); - String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); - IndexRequest indexRequest = buildIndexRequest( - update, - sensorType, - indexName - ); - - bulkRequestBuilder.add(indexRequest); - } - - BulkResponse bulkResponse = bulkRequestBuilder.get(); - if (bulkResponse.hasFailures()) { - LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); - throw new IOException( - "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage()); - } - } - - protected String getIndexName(Document update, Optional<String> index, String indexPostFix) { - return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) - .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) - ); - } - - protected Optional<String> getIndexName(String guid, String sensorType) { - return searchByGuid(guid, - sensorType, - hit -> Optional.ofNullable(hit.getIndex()) - ); - } - - protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) { - String type = sensorType + "_doc"; - Object ts = update.getTimestamp(); - IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) - .source(update.getDocument()); - if(ts != null) { - indexRequest = indexRequest.timestamp(ts.toString()); - } - - return indexRequest; + updateDao.batchUpdate(updates); } @Override public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { - return columnMetadataDao.getColumnMetadata(indices); - } - - private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( - org.apache.metron.indexing.dao.search.SortOrder sortOrder) { - return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? - org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; - } - - private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { - if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { - return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); - } else { - return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); - } - } - - public Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { - Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); - for (String field: fields) { - Map<String, Long> valueCounts = new HashMap<>(); - Aggregation aggregation = aggregations.get(getFacetAggregationName(field)); - if (aggregation instanceof Terms) { - Terms terms = (Terms) aggregation; - terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); - } - fieldCounts.put(field, valueCounts); - } - return fieldCounts; - } - - private String formatKey(Object key, FieldType type) { - if (FieldType.IP.equals(type) && key instanceof Long) { - return LegacyIpFieldMapper.longToIp((Long) key); - } else if (FieldType.BOOLEAN.equals(type)) { - return (Long) key == 1 ? "true" : "false"; - } else { - return key.toString(); - } - } - - private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { - List<Group> groups = groupRequest.getGroups(); - Group group = groups.get(index); - String aggregationName = getGroupByAggregationName(group.getField()); - TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggregationName); - termsBuilder - .field(group.getField()) - .size(accessConfig.getMaxSearchGroups()) - .order(getElasticsearchGroupOrder(group.getOrder())); - if (index < groups.size() - 1) { - termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); - } - Optional<String> scoreField = groupRequest.getScoreField(); - if (scoreField.isPresent()) { - SumAggregationBuilder scoreSumAggregationBuilder = AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0); - termsBuilder.subAggregation(scoreSumAggregationBuilder); - } - return termsBuilder; + return this.columnMetadataDao.getColumnMetadata(indices); } - private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { - List<Group> groups = groupRequest.getGroups(); - String field = groups.get(index).getField(); - Terms terms = aggregations.get(getGroupByAggregationName(field)); - List<GroupResult> searchResultGroups = new ArrayList<>(); - for(Bucket bucket: terms.getBuckets()) { - GroupResult groupResult = new GroupResult(); - groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); - groupResult.setTotal(bucket.getDocCount()); - Optional<String> scoreField = groupRequest.getScoreField(); - if (scoreField.isPresent()) { - Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); - groupResult.setScore(score.getValue()); - } - if (index < groups.size() - 1) { - groupResult.setGroupedBy(groups.get(index + 1).getField()); - groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); - } - searchResultGroups.add(groupResult); - } - return searchResultGroups; + protected Optional<String> getIndexName(String guid, String sensorType) { + return updateDao.getIndexName(guid, sensorType); } - private SearchResult getSearchResult(SearchHit searchHit, Optional<List<String>> fields) { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - Map<String, Object> source; - if (fields.isPresent()) { - Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap(); - source = new HashMap<>(); - fields.get().forEach(field -> { - source.put(field, resultSourceAsMap.get(field)); - }); - } else { - source = searchHit.getSource(); - } - searchResult.setSource(source); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; + protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { + return searchDao.search(request, queryBuilder); } - private String getFacetAggregationName(String field) { - return String.format("%s_count", field); + protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) throws InvalidSearchException { + return searchDao.group(groupRequest, queryBuilder); } public TransportClient getClient() { - return client; - } - - private String getGroupByAggregationName(String field) { - return String.format("%s_group", field); - } - - private String getSumAggregationName(String field) { - return String.format("%s_score", field); - } - - public ElasticsearchDao client(TransportClient client) { - this.client = client; - return this; - } - - public ElasticsearchDao columnMetadataDao(ColumnMetadataDao columnMetadataDao) { - this.columnMetadataDao = columnMetadataDao; - return this; - } - - public ElasticsearchDao accessConfig(AccessConfig accessConfig) { - this.accessConfig = accessConfig; - return this; + return this.client; } } http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java new file mode 100644 index 0000000..5e9ed02 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchSearchDao.java @@ -0,0 +1,565 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.dao; + +import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER; + +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchDao; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.search.SearchRequestBuilder; +import org.elasticsearch.client.transport.TransportClient; +import org.elasticsearch.index.mapper.LegacyIpFieldMapper; +import org.elasticsearch.index.query.IdsQueryBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import org.elasticsearch.search.aggregations.Aggregation; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.Aggregations; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket; +import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder; +import org.elasticsearch.search.aggregations.metrics.sum.Sum; +import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.FieldSortBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchSearchDao implements SearchDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_LAST = "_last"; + + /** + * The value required to ensure that Elasticsearch sorts missing values last. + */ + private static final String SORT_MISSING_FIRST = "_first"; + + private transient TransportClient client; + private AccessConfig accessConfig; + private ElasticsearchColumnMetadataDao columnMetadataDao; + private ElasticsearchRequestSubmitter requestSubmitter; + + public ElasticsearchSearchDao(TransportClient client, + AccessConfig accessConfig, + ElasticsearchColumnMetadataDao columnMetadataDao, + ElasticsearchRequestSubmitter requestSubmitter) { + this.client = client; + this.accessConfig = accessConfig; + this.columnMetadataDao = columnMetadataDao; + this.requestSubmitter = requestSubmitter; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + if(searchRequest.getQuery() == null) { + throw new InvalidSearchException("Search query is invalid: null"); + } + return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return group(groupRequest, new QueryStringQueryBuilder(groupRequest.getQuery())); + } + + @Override + public Document getLatest(String guid, String sensorType) throws IOException { + Optional<Document> doc = searchByGuid(guid, sensorType, hit -> toDocument(guid, hit)); + return doc.orElse(null); + } + + <T> Optional<T> searchByGuid(String guid, String sensorType, + Function<SearchHit, Optional<T>> callback) { + Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; + List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); + if (results.size() > 0) { + return Optional.of(results.get(0)); + } else { + return Optional.empty(); + } + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + Collection<String> guids = new HashSet<>(); + Collection<String> sensorTypes = new HashSet<>(); + for (GetRequest getRequest: getRequests) { + guids.add(getRequest.getGuid()); + sensorTypes.add(getRequest.getSensorType()); + } + List<Document> documents = searchByGuids( + guids + , sensorTypes + , hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + ); + return documents; + } + + /** + * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. + * @param request The request defining the parameters of the search + * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping + * @return The results of the query + * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search + */ + protected SearchResponse search(SearchRequest request, QueryBuilder queryBuilder) throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + + if(client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + + if (request.getSize() > accessConfig.getMaxSearchResults()) { + throw new InvalidSearchException("Search result size must be less than " + accessConfig.getMaxSearchResults()); + } + + esRequest = buildSearchRequest(request, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + return buildSearchResponse(request, esResponse); + } + + /** + * Builds an Elasticsearch search request. + * @param searchRequest The Metron search request. + * @param queryBuilder + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildSearchRequest( + SearchRequest searchRequest, + QueryBuilder queryBuilder) throws InvalidSearchException { + if (LOG.isDebugEnabled()) { + LOG.debug("Got search request; request={}", ElasticsearchUtils.toJSON(searchRequest).orElse("???")); + } + SearchSourceBuilder searchBuilder = new SearchSourceBuilder() + .size(searchRequest.getSize()) + .from(searchRequest.getFrom()) + .query(queryBuilder) + .trackScores(true); + Optional<List<String>> fields = searchRequest.getFields(); + // column metadata needed to understand the type of each sort field + Map<String, FieldType> meta; + try { + meta = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); + } catch(IOException e) { + throw new InvalidSearchException("Unable to get column metadata", e); + } + + // handle sort fields + for(SortField sortField : searchRequest.getSort()) { + + // what type is the sort field? + FieldType sortFieldType = meta.getOrDefault(sortField.getField(), FieldType.OTHER); + + // sort order - if ascending missing values sorted last. otherwise, missing values sorted first + org.elasticsearch.search.sort.SortOrder sortOrder = getElasticsearchSortOrder(sortField.getSortOrder()); + String missingSortOrder; + if(sortOrder == org.elasticsearch.search.sort.SortOrder.DESC) { + missingSortOrder = SORT_MISSING_LAST; + } else { + missingSortOrder = SORT_MISSING_FIRST; + } + + // sort by the field - missing fields always last + FieldSortBuilder sortBy = new FieldSortBuilder(sortField.getField()) + .order(sortOrder) + .missing(missingSortOrder) + .unmappedType(sortFieldType.getFieldType()); + searchBuilder.sort(sortBy); + } + + // handle search fields + if (fields.isPresent()) { + searchBuilder.fetchSource("*", null); + } else { + searchBuilder.fetchSource(true); + } + + Optional<List<String>> facetFields = searchRequest.getFacetFields(); + + // handle facet fields + if (searchRequest.getFacetFields().isPresent()) { + // https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html + for(String field : searchRequest.getFacetFields().get()) { + String name = getFacetAggregationName(field); + TermsAggregationBuilder terms = AggregationBuilders.terms( name).field(field); + // new TermsBuilder(name).field(field); + searchBuilder.aggregation(terms); + } + } + + // return the search request + String[] indices = wildcardIndices(searchRequest.getIndices()); + if (LOG.isDebugEnabled()) { + LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, searchBuilder.toString()); + } + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchBuilder); + } + + /** + * Builds a search response. + * + * This effectively transforms an Elasticsearch search response into a Metron search response. + * + * @param searchRequest The Metron search request. + * @param esResponse The Elasticsearch search response. + * @return A Metron search response. + * @throws InvalidSearchException + */ + private SearchResponse buildSearchResponse( + SearchRequest searchRequest, + org.elasticsearch.action.search.SearchResponse esResponse) throws InvalidSearchException { + + SearchResponse searchResponse = new SearchResponse(); + + searchResponse.setTotal(esResponse.getHits().getTotalHits()); + + // search hits --> search results + List<SearchResult> results = new ArrayList<>(); + for(SearchHit hit: esResponse.getHits().getHits()) { + results.add(getSearchResult(hit, searchRequest.getFields())); + } + searchResponse.setResults(results); + + // handle facet fields + if (searchRequest.getFacetFields().isPresent()) { + List<String> facetFields = searchRequest.getFacetFields().get(); + Map<String, FieldType> commonColumnMetadata; + try { + commonColumnMetadata = columnMetadataDao.getColumnMetadata(searchRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format( + "Could not get common column metadata for indices %s", + Arrays.toString(searchRequest.getIndices().toArray()))); + } + searchResponse.setFacetCounts(getFacetCounts(facetFields, esResponse.getAggregations(), commonColumnMetadata )); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Built search response; response={}", ElasticsearchUtils.toJSON(searchResponse).orElse("???")); + } + return searchResponse; + } + + private org.elasticsearch.search.sort.SortOrder getElasticsearchSortOrder( + org.apache.metron.indexing.dao.search.SortOrder sortOrder) { + return sortOrder == org.apache.metron.indexing.dao.search.SortOrder.DESC ? + org.elasticsearch.search.sort.SortOrder.DESC : org.elasticsearch.search.sort.SortOrder.ASC; + } + + private String getFacetAggregationName(String field) { + return String.format("%s_count", field); + } + + private String[] wildcardIndices(List<String> indices) { + if(indices == null) + return new String[] {}; + + return indices + .stream() + .map(index -> String.format("%s%s*", index, INDEX_NAME_DELIMITER)) + .toArray(value -> new String[indices.size()]); + } + + private SearchResult getSearchResult(SearchHit searchHit, Optional<List<String>> fields) { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + Map<String, Object> source; + if (fields.isPresent()) { + Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap(); + source = new HashMap<>(); + fields.get().forEach(field -> { + source.put(field, resultSourceAsMap.get(field)); + }); + } else { + source = searchHit.getSource(); + } + searchResult.setSource(source); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + + private Map<String, Map<String, Long>> getFacetCounts(List<String> fields, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { + Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); + for (String field: fields) { + Map<String, Long> valueCounts = new HashMap<>(); + Aggregation aggregation = aggregations.get(getFacetAggregationName(field)); + if (aggregation instanceof Terms) { + Terms terms = (Terms) aggregation; + terms.getBuckets().stream().forEach(bucket -> valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), bucket.getDocCount())); + } + fieldCounts.put(field, valueCounts); + } + return fieldCounts; + } + + private String formatKey(Object key, FieldType type) { + if (FieldType.IP.equals(type) && key instanceof Long) { + return LegacyIpFieldMapper.longToIp((Long) key); + } else if (FieldType.BOOLEAN.equals(type)) { + return (Long) key == 1 ? "true" : "false"; + } else { + return key.toString(); + } + } + + /** + * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. + * @param groupRequest The request defining the parameters of the grouping + * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping + * @return The results of the query + * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search + */ + protected GroupResponse group(GroupRequest groupRequest, QueryBuilder queryBuilder) + throws InvalidSearchException { + org.elasticsearch.action.search.SearchRequest esRequest; + org.elasticsearch.action.search.SearchResponse esResponse; + + if (client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (groupRequest.getGroups() == null || groupRequest.getGroups().size() == 0) { + throw new InvalidSearchException("At least 1 group must be provided."); + } + + esRequest = buildGroupRequest(groupRequest, queryBuilder); + esResponse = requestSubmitter.submitSearch(esRequest); + GroupResponse response = buildGroupResponse(groupRequest, esResponse); + + return response; + } + + /** + * Builds a group search request. + * @param groupRequest The Metron group request. + * @param queryBuilder The search query. + * @return An Elasticsearch search request. + */ + private org.elasticsearch.action.search.SearchRequest buildGroupRequest( + GroupRequest groupRequest, + QueryBuilder queryBuilder) { + + // handle groups + TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0); + final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() + .query(queryBuilder) + .aggregation(groups); + + // return the search request + String[] indices = wildcardIndices(groupRequest.getIndices()); + return new org.elasticsearch.action.search.SearchRequest() + .indices(indices) + .source(searchSourceBuilder); + } + + private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest groupRequest, int index) { + List<Group> groups = groupRequest.getGroups(); + Group group = groups.get(index); + String aggregationName = getGroupByAggregationName(group.getField()); + TermsAggregationBuilder termsBuilder = AggregationBuilders.terms(aggregationName); + termsBuilder + .field(group.getField()) + .size(accessConfig.getMaxSearchGroups()) + .order(getElasticsearchGroupOrder(group.getOrder())); + if (index < groups.size() - 1) { + termsBuilder.subAggregation(getGroupsTermBuilder(groupRequest, index + 1)); + } + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + SumAggregationBuilder scoreSumAggregationBuilder = AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0); + termsBuilder.subAggregation(scoreSumAggregationBuilder); + } + return termsBuilder; + } + + private String getGroupByAggregationName(String field) { + return String.format("%s_group", field); + } + + private String getSumAggregationName(String field) { + return String.format("%s_score", field); + } + + private Order getElasticsearchGroupOrder(GroupOrder groupOrder) { + if (groupOrder.getGroupOrderType() == GroupOrderType.TERM) { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.term(true) : Order.term(false); + } else { + return groupOrder.getSortOrder() == SortOrder.ASC ? Order.count(true) : Order.count(false); + } + } + + /** + * Build a group response. + * @param groupRequest The original group request. + * @param response The search response. + * @return A group response. + * @throws InvalidSearchException + */ + private GroupResponse buildGroupResponse( + GroupRequest groupRequest, + org.elasticsearch.action.search.SearchResponse response) throws InvalidSearchException { + + // build the search response + Map<String, FieldType> commonColumnMetadata; + try { + commonColumnMetadata = columnMetadataDao.getColumnMetadata(groupRequest.getIndices()); + } catch (IOException e) { + throw new InvalidSearchException(String.format("Could not get common column metadata for indices %s", + Arrays.toString(groupRequest.getIndices().toArray()))); + } + + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, response.getAggregations(), commonColumnMetadata)); + return groupResponse; + } + + private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, Aggregations aggregations, Map<String, FieldType> commonColumnMetadata) { + List<Group> groups = groupRequest.getGroups(); + String field = groups.get(index).getField(); + Terms terms = aggregations.get(getGroupByAggregationName(field)); + List<GroupResult> searchResultGroups = new ArrayList<>(); + for(Bucket bucket: terms.getBuckets()) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(formatKey(bucket.getKey(), commonColumnMetadata.get(field))); + groupResult.setTotal(bucket.getDocCount()); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + Sum score = bucket.getAggregations().get(getSumAggregationName(scoreField.get())); + groupResult.setScore(score.getValue()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, bucket.getAggregations(), commonColumnMetadata)); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + /** + * Return the search hit based on the UUID and sensor type. + * A callback can be specified to transform the hit into a type T. + * If more than one hit happens, the first one will be returned. + */ + <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, + Function<SearchHit, Optional<T>> callback) { + if(guids == null || guids.isEmpty()) { + return Collections.EMPTY_LIST; + } + QueryBuilder query = null; + IdsQueryBuilder idsQuery = null; + if (sensorTypes != null) { + String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); + idsQuery = QueryBuilders.idsQuery(types); + } else { + idsQuery = QueryBuilders.idsQuery(); + } + + for(String guid : guids) { + query = idsQuery.addIds(guid); + } + + SearchRequestBuilder request = client.prepareSearch() + .setQuery(query) + .setSize(guids.size()) + ; + org.elasticsearch.action.search.SearchResponse response = request.get(); + SearchHits hits = response.getHits(); + List<T> results = new ArrayList<>(); + for (SearchHit hit : hits) { + Optional<T> result = callback.apply(hit); + if (result.isPresent()) { + results.add(result.get()); + } + } + return results; + } + + private Optional<Document> toDocument(final String guid, SearchHit hit) { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = toSourceType(hit.getType()); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + /** + * Returns the source type based on a given doc type. + * @param docType The document type. + * @return The source type. + */ + private String toSourceType(String docType) { + return Iterables.getFirst(Splitter.on("_doc").split(docType), null); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java new file mode 100644 index 0000000..a7c3a71 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.dao; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Arrays; +import java.util.Date; +import java.util.Map; +import java.util.Optional; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.UpdateDao; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; +import org.elasticsearch.client.transport.TransportClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ElasticsearchUpdateDao implements UpdateDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private transient TransportClient client; + private AccessConfig accessConfig; + private ElasticsearchSearchDao searchDao; + + public ElasticsearchUpdateDao(TransportClient client, + AccessConfig accessConfig, + ElasticsearchSearchDao searchDao) { + this.client = client; + this.accessConfig = accessConfig; + this.searchDao = searchDao; + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + String indexPostfix = ElasticsearchUtils + .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); + String sensorType = update.getSensorType(); + String indexName = getIndexName(update, index, indexPostfix); + + IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); + try { + IndexResponse response = client.index(indexRequest).get(); + + ShardInfo shardInfo = response.getShardInfo(); + int failed = shardInfo.getFailed(); + if (failed > 0) { + throw new IOException( + "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { + String indexPostfix = ElasticsearchUtils + .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); + + BulkRequestBuilder bulkRequestBuilder = client.prepareBulk(); + + // Get the indices we'll actually be using for each Document. + for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { + Document update = updateEntry.getKey(); + String sensorType = update.getSensorType(); + String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); + IndexRequest indexRequest = buildIndexRequest( + update, + sensorType, + indexName + ); + + bulkRequestBuilder.add(indexRequest); + } + + BulkResponse bulkResponse = bulkRequestBuilder.get(); + if (bulkResponse.hasFailures()) { + LOG.error("Bulk Request has failures: {}", bulkResponse.buildFailureMessage()); + throw new IOException( + "ElasticsearchDao upsert failed: " + bulkResponse.buildFailureMessage()); + } + } + + protected String getIndexName(Document update, Optional<String> index, String indexPostFix) { + return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) + .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) + ); + } + + protected Optional<String> getIndexName(String guid, String sensorType) { + return searchDao.searchByGuid(guid, + sensorType, + hit -> Optional.ofNullable(hit.getIndex()) + ); + } + + protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) { + String type = sensorType + "_doc"; + Object ts = update.getTimestamp(); + IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) + .source(update.getDocument()); + if(ts != null) { + indexRequest = indexRequest.timestamp(ts.toString()); + } + + return indexRequest; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index 2a6fb4f..ca1b860 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.ColumnMetadataDao; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -79,7 +80,7 @@ public class ElasticsearchDaoTest { when(response.getHits()).thenReturn(searchHits); // provides column metadata - ColumnMetadataDao columnMetadataDao = mock(ColumnMetadataDao.class); + ElasticsearchColumnMetadataDao columnMetadataDao = mock(ElasticsearchColumnMetadataDao.class); when(columnMetadataDao.getColumnMetadata(any())).thenReturn(metadata); // returns the search response @@ -92,7 +93,10 @@ public class ElasticsearchDaoTest { AccessConfig config = mock(AccessConfig.class); when(config.getMaxSearchResults()).thenReturn(maxSearchResults); - dao = new ElasticsearchDao(client, columnMetadataDao, requestSubmitter, config); + ElasticsearchSearchDao elasticsearchSearchDao = new ElasticsearchSearchDao(client, config, columnMetadataDao, requestSubmitter); + ElasticsearchUpdateDao elasticsearchUpdateDao = new ElasticsearchUpdateDao(client, config, elasticsearchSearchDao); + + dao = new ElasticsearchDao(client, config, elasticsearchSearchDao, elasticsearchUpdateDao, columnMetadataDao, requestSubmitter); } private void setup(RestStatus status, int maxSearchResults) throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index 1bc5b6e..5569c54 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -19,7 +19,12 @@ package org.apache.metron.elasticsearch.integration; import java.io.File; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.io.IOException; +import java.util.List; +import java.util.Map; import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutionException; @@ -31,6 +36,14 @@ import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.integration.InMemoryComponent; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; @@ -42,6 +55,13 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.util.HashMap; +import java.util.concurrent.ExecutionException; +import org.junit.Assert; +import org.junit.Test; + public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String indexDir = "target/elasticsearch_search"; @@ -238,5 +258,97 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { } } + @Test + public void bad_facet_query_throws_exception() throws Exception { + thrown.expect(InvalidSearchException.class); + thrown.expectMessage("Failed to execute search"); + SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class); + dao.search(request); + } + + + + @Override + public void returns_column_metadata_for_specified_indices() throws Exception { + // getColumnMetadata with only bro + { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); + Assert.assertEquals(13, fieldTypes.size()); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + } + // getColumnMetadata with only snort + { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); + Assert.assertEquals(14, fieldTypes.size()); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + } + } + + @Override + public void returns_column_data_for_multiple_indices() throws Exception { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); + Assert.assertEquals(15, fieldTypes.size()); + Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + //NOTE: This is because the field is in both bro and snort and they have different types. + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); + } + @Test + public void throws_exception_on_aggregation_queries_on_non_string_non_numeric_fields() + throws Exception { + thrown.expect(InvalidSearchException.class); + thrown.expectMessage("Failed to execute search"); + GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class); + dao.group(request); + } + + @Test + public void different_type_filter_query() throws Exception { + SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertEquals(1, response.getTotal()); + List<SearchResult> results = response.getResults(); + Assert.assertEquals("bro", results.get(0).getSource().get("source:type")); + Assert.assertEquals("data 1", results.get(0).getSource().get("duplicate_name_field")); + } }