Github user merrimanr commented on a diff in the pull request: https://github.com/apache/metron/pull/911#discussion_r165141915 --- Diff: metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java --- @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchDao; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrQuery.ORDER; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FacetField.Count; +import org.apache.solr.client.solrj.response.PivotField; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolrSearchDao implements SearchDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private transient SolrClient client; + private AccessConfig accessConfig; + + public SolrSearchDao(SolrClient client, AccessConfig accessConfig) { + this.client = client; + this.accessConfig = accessConfig; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + if (searchRequest.getQuery() == null) { + throw new InvalidSearchException("Search query is invalid: null"); + } + if (client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) { + throw new InvalidSearchException( + "Search result size must be less than " + accessConfig.getMaxSearchResults()); + } + SolrQuery query = buildSearchRequest(searchRequest); + try { + QueryResponse response = client.query(query); + return buildSearchResponse(searchRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + String groupNames = groupRequest.getGroups().stream().map(Group::getField).collect( + Collectors.joining(",")); + SolrQuery query = new SolrQuery() + .setStart(0) + .setRows(0) + .setQuery("*:*"); + query.set("collection", "bro,snort"); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + query.set("stats", true); + query.set("stats.field", String.format("{!tag=piv1 sum=true}%s", scoreField.get())); + } + query.set("facet", true); + query.set("facet.pivot", String.format("{!stats=piv1}%s", groupNames)); + try { + QueryResponse response = client.query(query); + return buildGroupResponse(groupRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public Document getLatest(String guid, String collection) throws IOException { + try { + SolrDocument solrDocument = client.getById(collection, guid); + return toDocument(solrDocument); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + Map<String, Collection<String>> collectionIdMap = new HashMap<>(); + for (GetRequest getRequest: getRequests) { + Collection<String> ids = collectionIdMap.get(getRequest.getSensorType()); + if (ids == null) { + ids = new HashSet<>(); + } + ids.add(getRequest.getGuid()); + collectionIdMap.put(getRequest.getSensorType(), ids); + } + try { + List<Document> documents = new ArrayList<>(); + for (String collection: collectionIdMap.keySet()) { + SolrDocumentList solrDocumentList = client.getById(collectionIdMap.get(collection), + new SolrQuery().set("collection", collection)); + documents.addAll(solrDocumentList.stream().map(this::toDocument).collect(Collectors.toList())); + } + return documents; + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + private SolrQuery buildSearchRequest( + SearchRequest searchRequest) throws InvalidSearchException { + SolrQuery query = new SolrQuery() + .setStart(searchRequest.getFrom()) + .setRows(searchRequest.getSize()) + .setQuery(searchRequest.getQuery()); + + // handle sort fields + for (SortField sortField : searchRequest.getSort()) { + query.addSort(sortField.getField(), getSolrSortOrder(sortField.getSortOrder())); + } + + // handle search fields + Optional<List<String>> fields = searchRequest.getFields(); + if (fields.isPresent()) { + fields.get().forEach(query::addField); + } + + //handle facet fields + Optional<List<String>> facetFields = searchRequest.getFacetFields(); + if (facetFields.isPresent()) { + facetFields.get().forEach(query::addFacetField); + } + + String collections = searchRequest.getIndices().stream().collect(Collectors.joining(",")); + query.set("collection", collections); + + return query; + } + + private SolrQuery.ORDER getSolrSortOrder( + SortOrder sortOrder) { + return sortOrder == SortOrder.DESC ? + ORDER.desc : ORDER.asc; + } + + private SearchResponse buildSearchResponse( + SearchRequest searchRequest, + QueryResponse solrResponse) throws InvalidSearchException { + + SearchResponse searchResponse = new SearchResponse(); + SolrDocumentList solrDocumentList = solrResponse.getResults(); + searchResponse.setTotal(solrDocumentList.getNumFound()); + + // search hits --> search results + List<SearchResult> results = solrDocumentList.stream() + .map(solrDocument -> getSearchResult(solrDocument, searchRequest.getFields())) + .collect(Collectors.toList()); + searchResponse.setResults(results); + + // handle facet fields + Optional<List<String>> facetFields = searchRequest.getFacetFields(); + if (facetFields.isPresent()) { + searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), solrResponse)); + } + + if (LOG.isDebugEnabled()) { + String response; + try { + response = JSONUtils.INSTANCE.toJSON(searchResponse, false); + } catch (JsonProcessingException e) { + response = "???"; --- End diff -- There are several other places that use this pattern as well. I changed it to `e.getMessage()` here but wanted you to be aware.
---