http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index 24f7a27..98dc66d 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -36,18 +36,24 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import org.apache.commons.lang.StringUtils; import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.common.utils.HDFSUtils; import org.apache.metron.common.utils.ReflectionUtils; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.netty.utils.NettyRuntimeWrapper; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.codehaus.jackson.map.ObjectMapper; +import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.xcontent.XContentHelper; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -321,4 +327,62 @@ public class ElasticsearchUtils { return json; } + + /** + * Elasticsearch queries default to 10 records returned. Some internal queries require that all + * results are returned. Rather than setting an arbitrarily high size, this method pages through results + * and returns them all in a single SearchResponse. + * @param qb A QueryBuilder that provides the query to be run. + * @return A SearchResponse containing the appropriate results. + */ + public static SearchResponse queryAllResults(TransportClient transportClient, + QueryBuilder qb, + String index, + int pageSize + ) { + SearchRequestBuilder searchRequestBuilder = transportClient + .prepareSearch(index) + .addStoredField("*") + .setFetchSource(true) + .setQuery(qb) + .setSize(pageSize); + org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder + .execute() + .actionGet(); + List<SearchResult> allResults = getSearchResults(esResponse); + long total = esResponse.getHits().getTotalHits(); + if (total > pageSize) { + int pages = (int) (total / pageSize) + 1; + for (int i = 1; i < pages; i++) { + int from = i * pageSize; + searchRequestBuilder.setFrom(from); + esResponse = searchRequestBuilder + .execute() + .actionGet(); + allResults.addAll(getSearchResults(esResponse)); + } + } + SearchResponse searchResponse = new SearchResponse(); + searchResponse.setTotal(total); + searchResponse.setResults(allResults); + return searchResponse; + } + + /** + * Transforms a list of Elasticsearch SearchHits to a list of SearchResults + * @param searchResponse An Elasticsearch SearchHit to be converted. + * @return The list of SearchResults for the SearchHit + */ + protected static List<SearchResult> getSearchResults( + org.elasticsearch.action.search.SearchResponse searchResponse) { + return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + searchResult.setSource(searchHit.getSource()); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + ).collect(Collectors.toList()); + } }
http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java index ca1b860..6c3c327 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java @@ -17,7 +17,9 @@ */ package org.apache.metron.elasticsearch.dao; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; @@ -26,15 +28,15 @@ import static org.mockito.Mockito.when; import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.ColumnMetadataDao; +import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SortField; import org.apache.metron.indexing.dao.search.SortOrder; -import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; -import org.apache.metron.indexing.dao.search.FieldType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.elasticsearch.search.SearchHit; @@ -45,37 +47,38 @@ import org.json.simple.parser.JSONParser; import org.junit.Test; import org.mockito.ArgumentCaptor; -import java.util.Map; - -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertNotNull; - public class ElasticsearchDaoTest { private ElasticsearchDao dao; private ElasticsearchRequestSubmitter requestSubmitter; - private void setup(RestStatus status, int maxSearchResults, Map<String, FieldType> metadata) throws Exception { + private void setup(RestStatus status, int maxSearchResults, Map<String, FieldType> metadata) + throws Exception { // setup the mock search hits SearchHit hit1 = mock(SearchHit.class); when(hit1.getId()).thenReturn("id1"); - when(hit1.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value1"); }}); + when(hit1.getSource()).thenReturn(new HashMap<String, Object>() {{ + put("field", "value1"); + }}); when(hit1.getScore()).thenReturn(0.1f); SearchHit hit2 = mock(SearchHit.class); when(hit2.getId()).thenReturn("id2"); - when(hit2.getSource()).thenReturn(new HashMap<String, Object>(){{ put("field", "value2"); }}); + when(hit2.getSource()).thenReturn(new HashMap<String, Object>() {{ + put("field", "value2"); + }}); when(hit2.getScore()).thenReturn(0.2f); // search hits - SearchHit[] hits = { hit1, hit2 }; + SearchHit[] hits = {hit1, hit2}; SearchHits searchHits = mock(SearchHits.class); when(searchHits.getHits()).thenReturn(hits); when(searchHits.getTotalHits()).thenReturn(Integer.toUnsignedLong(hits.length)); // search response which returns the search hits - org.elasticsearch.action.search.SearchResponse response = mock(org.elasticsearch.action.search.SearchResponse.class); + org.elasticsearch.action.search.SearchResponse response = mock( + org.elasticsearch.action.search.SearchResponse.class); when(response.status()).thenReturn(status); when(response.getHits()).thenReturn(searchHits); @@ -93,10 +96,21 @@ public class ElasticsearchDaoTest { AccessConfig config = mock(AccessConfig.class); when(config.getMaxSearchResults()).thenReturn(maxSearchResults); - ElasticsearchSearchDao elasticsearchSearchDao = new ElasticsearchSearchDao(client, config, columnMetadataDao, requestSubmitter); - ElasticsearchUpdateDao elasticsearchUpdateDao = new ElasticsearchUpdateDao(client, config, elasticsearchSearchDao); - - dao = new ElasticsearchDao(client, config, elasticsearchSearchDao, elasticsearchUpdateDao, columnMetadataDao, requestSubmitter); + ElasticsearchSearchDao elasticsearchSearchDao = new ElasticsearchSearchDao(client, config, + columnMetadataDao, requestSubmitter); + ElasticsearchRetrieveLatestDao elasticsearchRetrieveLatestDao = new ElasticsearchRetrieveLatestDao( + client); + ElasticsearchUpdateDao elasticsearchUpdateDao = new ElasticsearchUpdateDao(client, config, + elasticsearchRetrieveLatestDao); + + dao = new ElasticsearchDao( + client, + config, + elasticsearchSearchDao, + elasticsearchUpdateDao, + elasticsearchRetrieveLatestDao, + columnMetadataDao, + requestSubmitter); } private void setup(RestStatus status, int maxSearchResults) throws Exception { @@ -116,9 +130,9 @@ public class ElasticsearchDaoTest { // "sort by" fields for the search request SortField[] expectedSortFields = { - sortBy("sortByStringDesc", SortOrder.DESC), - sortBy("sortByIntAsc", SortOrder.ASC), - sortBy("sortByUndefinedDesc", SortOrder.DESC) + sortBy("sortByStringDesc", SortOrder.DESC), + sortBy("sortByIntAsc", SortOrder.ASC), + sortBy("sortByUndefinedDesc", SortOrder.DESC) }; // create a metron search request @@ -135,7 +149,8 @@ public class ElasticsearchDaoTest { assertNotNull(searchResponse); // capture the elasticsearch search request that was created - ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class); + ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor + .forClass(org.elasticsearch.action.search.SearchRequest.class); verify(requestSubmitter).submitSearch(argument.capture()); org.elasticsearch.action.search.SearchRequest request = argument.getValue(); @@ -181,9 +196,9 @@ public class ElasticsearchDaoTest { // "sort by" fields for the search request SortField[] expectedSortFields = { - sortBy("sortByStringDesc", SortOrder.DESC), - sortBy("sortByIntAsc", SortOrder.ASC), - sortBy("sortByUndefinedDesc", SortOrder.DESC) + sortBy("sortByStringDesc", SortOrder.DESC), + sortBy("sortByIntAsc", SortOrder.ASC), + sortBy("sortByUndefinedDesc", SortOrder.DESC) }; // create a metron search request @@ -200,7 +215,8 @@ public class ElasticsearchDaoTest { assertNotNull(searchResponse); // capture the elasticsearch search request that was created - ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor.forClass(org.elasticsearch.action.search.SearchRequest.class); + ArgumentCaptor<org.elasticsearch.action.search.SearchRequest> argument = ArgumentCaptor + .forClass(org.elasticsearch.action.search.SearchRequest.class); verify(requestSubmitter).submitSearch(argument.capture()); org.elasticsearch.action.search.SearchRequest request = argument.getValue(); @@ -209,7 +225,7 @@ public class ElasticsearchDaoTest { JSONObject json = (JSONObject) parser.parse(ElasticsearchUtils.toJSON(request).orElse("???")); // ensure that the index names are 'wildcard-ed' - String[] expected = { "bro_index*", "snort_index*" }; + String[] expected = {"bro_index*", "snort_index*"}; assertArrayEquals(expected, request.indices()); } @@ -221,7 +237,7 @@ public class ElasticsearchDaoTest { setup(RestStatus.OK, maxSearchResults); SearchRequest searchRequest = new SearchRequest(); - searchRequest.setSize(maxSearchResults+1); + searchRequest.setSize(maxSearchResults + 1); searchRequest.setQuery(""); dao.search(searchRequest); // exception expected - size > max http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java index 1bfa9d6..25799ad 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDaoTest.java @@ -18,32 +18,21 @@ package org.apache.metron.elasticsearch.dao; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; - import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; -import org.apache.metron.common.Constants; -import org.apache.metron.common.Constants.Fields; import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.HBaseDao; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.MultiIndexDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; -import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidCreateException; -import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.update.Document; @@ -52,17 +41,16 @@ import org.junit.Test; public class ElasticsearchMetaAlertDaoTest { - @Test(expected = IllegalArgumentException.class) public void testInvalidInit() { IndexDao dao = new IndexDao() { @Override - public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + public SearchResponse search(SearchRequest searchRequest) { return null; } @Override - public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + public GroupResponse group(GroupRequest groupRequest) { return null; } @@ -71,27 +59,26 @@ public class ElasticsearchMetaAlertDaoTest { } @Override - public Document getLatest(String guid, String sensorType) throws IOException { + public Document getLatest(String guid, String sensorType) { return null; } @Override public Iterable<Document> getAllLatest( - List<GetRequest> getRequests) throws IOException { + List<GetRequest> getRequests) { return null; } @Override - public void update(Document update, Optional<String> index) throws IOException { + public void update(Document update, Optional<String> index) { } @Override - public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { + public void batchUpdate(Map<Document, Optional<String>> updates) { } @Override - public Map<String, FieldType> getColumnMetadata(List<String> indices) - throws IOException { + public Map<String, FieldType> getColumnMetadata(List<String> indices) { return null; } }; @@ -99,92 +86,11 @@ public class ElasticsearchMetaAlertDaoTest { metaAlertDao.init(dao); } - @Test - public void testBuildCreateDocumentSingleAlert() throws InvalidCreateException, IOException { - ElasticsearchDao esDao = new ElasticsearchDao(); - ElasticsearchMetaAlertDao emaDao = new ElasticsearchMetaAlertDao(); - emaDao.init(esDao); - - List<String> groups = new ArrayList<>(); - groups.add("group_one"); - groups.add("group_two"); - - // Build the first response from the multiget - Map<String, Object> alertOne = new HashMap<>(); - alertOne.put(Constants.GUID, "alert_one"); - alertOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d); - List<Document> alerts = new ArrayList<Document>() {{ - add(new Document(alertOne, "", "", 0L)); - }}; - - // Actually build the doc - Document actual = emaDao.buildCreateDocument(alerts, groups); - - ArrayList<Map<String, Object>> alertList = new ArrayList<>(); - alertList.add(alertOne); - - Map<String, Object> actualDocument = actual.getDocument(); - assertEquals( - MetaAlertStatus.ACTIVE.getStatusString(), - actualDocument.get(MetaAlertDao.STATUS_FIELD) - ); - assertEquals( - alertList, - actualDocument.get(MetaAlertDao.ALERT_FIELD) - ); - assertEquals( - groups, - actualDocument.get(MetaAlertDao.GROUPS_FIELD) - ); - - // Don't care about the result, just that it's a UUID. Exception will be thrown if not. - UUID.fromString((String) actualDocument.get(Constants.GUID)); - } - - @Test - public void testBuildCreateDocumentMultipleAlerts() throws InvalidCreateException, IOException { - ElasticsearchDao esDao = new ElasticsearchDao(); - ElasticsearchMetaAlertDao emaDao = new ElasticsearchMetaAlertDao(); - emaDao.init(esDao); - - List<String> groups = new ArrayList<>(); - groups.add("group_one"); - groups.add("group_two"); - - // Build the first response from the multiget - Map<String, Object> alertOne = new HashMap<>(); - alertOne.put(Constants.GUID, "alert_one"); - alertOne.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0d); - - // Build the second response from the multiget - Map<String, Object> alertTwo = new HashMap<>(); - alertTwo.put(Constants.GUID, "alert_one"); - alertTwo.put(MetaAlertDao.THREAT_FIELD_DEFAULT, 5.0d); - List<Document> alerts = new ArrayList<Document>() {{ - add(new Document(alertOne, "", "", 0L)); - add(new Document(alertTwo, "", "", 0L)); - }}; - - // Actually build the doc - Document actual = emaDao.buildCreateDocument(alerts, groups); - - ArrayList<Map<String, Object>> alertList = new ArrayList<>(); - alertList.add(alertOne); - alertList.add(alertTwo); - - Map<String, Object> actualDocument = actual.getDocument(); - assertNotNull(actualDocument.get(Fields.TIMESTAMP.getName())); - assertEquals( - alertList, - actualDocument.get(MetaAlertDao.ALERT_FIELD) - ); - assertEquals( - groups, - actualDocument.get(MetaAlertDao.GROUPS_FIELD) - ); - - // Don't care about the result, just that it's a UUID. Exception will be thrown if not. - UUID.fromString((String) actualDocument.get(Constants.GUID)); + @Test(expected = IllegalArgumentException.class) + public void testInitInvalidDao() { + HBaseDao dao = new HBaseDao(); + ElasticsearchMetaAlertDao esDao = new ElasticsearchMetaAlertDao(); + esDao.init(dao, Optional.empty()); } @Test(expected = InvalidCreateException.class) @@ -200,50 +106,12 @@ public class ElasticsearchMetaAlertDaoTest { @Test(expected = InvalidCreateException.class) public void testCreateMetaAlertEmptyGroups() throws InvalidCreateException, IOException { ElasticsearchDao esDao = new ElasticsearchDao(); + MultiIndexDao miDao = new MultiIndexDao(esDao); ElasticsearchMetaAlertDao emaDao = new ElasticsearchMetaAlertDao(); - emaDao.init(esDao); + emaDao.init(miDao); MetaAlertCreateRequest createRequest = new MetaAlertCreateRequest(); createRequest.setAlerts(Collections.singletonList(new GetRequest("don't", "care"))); emaDao.createMetaAlert(createRequest); } - - @Test - public void testCalculateMetaScoresList() { - final double delta = 0.001; - List<Map<String, Object>> alertList = new ArrayList<>(); - - // add an alert with a threat score - alertList.add( Collections.singletonMap(MetaAlertDao.THREAT_FIELD_DEFAULT, 10.0f)); - - // add a second alert with a threat score - alertList.add( Collections.singletonMap(MetaAlertDao.THREAT_FIELD_DEFAULT, 20.0f)); - - // add a third alert with NO threat score - alertList.add( Collections.singletonMap("alert3", "has no threat score")); - - // create the metaalert - Map<String, Object> docMap = new HashMap<>(); - docMap.put(MetaAlertDao.ALERT_FIELD, alertList); - Document metaalert = new Document(docMap, "guid", MetaAlertDao.METAALERT_TYPE, 0L); - - // calculate the threat score for the metaalert - ElasticsearchMetaAlertDao metaAlertDao = new ElasticsearchMetaAlertDao(); - metaAlertDao.calculateMetaScores(metaalert); - Object threatScore = metaalert.getDocument().get(ElasticsearchMetaAlertDao.THREAT_FIELD_DEFAULT); - - // the metaalert must contain a summary of all child threat scores - assertEquals(20D, (Double) metaalert.getDocument().get("max"), delta); - assertEquals(10D, (Double) metaalert.getDocument().get("min"), delta); - assertEquals(15D, (Double) metaalert.getDocument().get("average"), delta); - assertEquals(2L, metaalert.getDocument().get("count")); - assertEquals(30D, (Double) metaalert.getDocument().get("sum"), delta); - assertEquals(15D, (Double) metaalert.getDocument().get("median"), delta); - - // it must contain an overall threat score; a float to match the type of the threat score of the other sensor indices - assertTrue(threatScore instanceof Float); - - // by default, the overall threat score is the sum of all child threat scores - assertEquals(30.0F, threatScore); - } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java index 9e74fb6..6fa6956 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java @@ -18,15 +18,14 @@ package org.apache.metron.elasticsearch.integration; -import static org.apache.metron.indexing.dao.MetaAlertDao.ALERT_FIELD; -import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERTS_INDEX; -import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_FIELD; -import static org.apache.metron.indexing.dao.MetaAlertDao.METAALERT_TYPE; -import static org.apache.metron.indexing.dao.MetaAlertDao.STATUS_FIELD; +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.THREAT_TRIAGE_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; @@ -35,11 +34,9 @@ import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.stream.Collectors; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; @@ -49,23 +46,12 @@ import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.MetaAlertDao; -import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; -import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertIntegrationTest; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.GetRequest; -import org.apache.metron.indexing.dao.search.Group; -import org.apache.metron.indexing.dao.search.GroupRequest; -import org.apache.metron.indexing.dao.search.GroupResponse; -import org.apache.metron.indexing.dao.search.GroupResult; -import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; -import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.search.SortField; -import org.apache.metron.indexing.dao.update.Document; -import org.apache.metron.indexing.dao.update.OriginalNotFoundException; -import org.apache.metron.indexing.dao.update.PatchRequest; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -73,113 +59,50 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -public class ElasticsearchMetaAlertIntegrationTest { - - private static final int MAX_RETRIES = 10; - private static final int SLEEP_MS = 500; - private static final String SENSOR_NAME = "test"; - private static final String INDEX_DIR = "target/elasticsearch_meta"; - private static final String DATE_FORMAT = "yyyy.MM.dd.HH"; - private static final String INDEX = - SENSOR_NAME + "_index_" + new SimpleDateFormat(DATE_FORMAT).format(new Date()); - private static final String NEW_FIELD = "new-field"; - private static final String NAME_FIELD = "name"; +public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationTest { private static IndexDao esDao; - private static MetaAlertDao metaDao; private static ElasticSearchComponent es; - /** - { - "properties": { - "alert": { - "type": "nested" - } - } - } - */ - @Multiline - public static String nestedAlertMapping; + protected static final String INDEX_DIR = "target/elasticsearch_meta"; - /** - { - "guid": "meta_alert", - "index": "metaalert_index", - "patch": [ - { - "op": "add", - "path": "/name", - "value": "New Meta Alert" - } - ], - "sensorType": "metaalert" - } - */ - @Multiline - public static String namePatchRequest; + protected static final String INDEX = + SENSOR_NAME + "_" + new SimpleDateFormat(DATE_FORMAT).format(new Date()); + protected static final String INDEX_WITH_SEPARATOR = INDEX + "_index"; - /** - { - "guid": "meta_alert", - "index": "metaalert_index", - "patch": [ - { - "op": "add", - "path": "/name", - "value": "New Meta Alert" - }, - { - "op": "add", - "path": "/alert", - "value": [] - } - ], - "sensorType": "metaalert" - } - */ - @Multiline - public static String alertPatchRequest; + protected ArrayList<String> queryIndices = allIndices.stream().map(x -> x.replace("_index", "")) + .collect(Collectors.toCollection(ArrayList::new)); /** { - "guid": "meta_alert", - "index": "metaalert_index", - "patch": [ - { - "op": "add", - "path": "/status", - "value": "inactive" - }, - { - "op": "add", - "path": "/name", - "value": "New Meta Alert" - } - ], - "sensorType": "metaalert" + "properties": { + "alert": { + "type": "nested" + } + } } */ @Multiline - public static String statusPatchRequest; + public static String nestedAlertMapping; /** * { - "%MAPPING_NAME%_doc" : { - "properties" : { - "guid" : { - "type" : "keyword" - }, - "ip_src_addr" : { - "type" : "keyword" - }, - "score" : { - "type" : "integer" - }, - "alert" : { - "type" : "nested" - } - } - } + "%MAPPING_NAME%_doc" : { + "properties" : { + "guid" : { + "type" : "keyword" + }, + "ip_src_addr" : { + "type" : "keyword" + }, + "score" : { + "type" : "integer" + }, + "alert" : { + "type" : "nested" + } + } + } } */ @Multiline @@ -187,6 +110,9 @@ public class ElasticsearchMetaAlertIntegrationTest { @BeforeClass public static void setupBefore() throws Exception { + // Ensure ES can retry as needed. + MAX_RETRIES = 10; + // setup the client es = new ElasticSearchComponent.Builder() .withHttpPort(9211) @@ -209,13 +135,17 @@ public class ElasticsearchMetaAlertIntegrationTest { esDao = new ElasticsearchDao(); esDao.init(accessConfig); - metaDao = new ElasticsearchMetaAlertDao(esDao); + ElasticsearchMetaAlertDao elasticsearchMetaDao = new ElasticsearchMetaAlertDao(esDao); + elasticsearchMetaDao.setPageSize(5); + metaDao = elasticsearchMetaDao; } @Before public void setup() throws IOException { - es.createIndexWithMapping(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC, template.replace("%MAPPING_NAME%", "metaalert")); - es.createIndexWithMapping(INDEX, "index_doc", template.replace("%MAPPING_NAME%", "index")); + es.createIndexWithMapping(METAALERTS_INDEX, METAALERT_DOC, + template.replace("%MAPPING_NAME%", "metaalert")); + es.createIndexWithMapping( + INDEX_WITH_SEPARATOR, "index_doc", template.replace("%MAPPING_NAME%", "index")); } @AfterClass @@ -230,464 +160,8 @@ public class ElasticsearchMetaAlertIntegrationTest { es.reset(); } - - @Test - public void shouldGetAllMetaAlertsForAlert() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(3); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlerts - List<Map<String, Object>> metaAlerts = buildMetaAlerts(12, MetaAlertStatus.ACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - metaAlerts.add(buildMetaAlert("meta_active_12", MetaAlertStatus.ACTIVE, - Optional.of(Arrays.asList(alerts.get(0), alerts.get(2))))); - metaAlerts.add(buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE, - Optional.of(Arrays.asList(alerts.get(0), alerts.get(2))))); - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(metaAlerts, METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE); - - // Verify load was successful - List<GetRequest> createdDocs = metaAlerts.stream().map(metaAlert -> - new GetRequest((String) metaAlert.get(Constants.GUID), METAALERT_TYPE)) - .collect(Collectors.toList()); - createdDocs.addAll(alerts.stream().map(alert -> - new GetRequest((String) alert.get(Constants.GUID), SENSOR_NAME)) - .collect(Collectors.toList())); - findCreatedDocs(createdDocs); - - int previousPageSize = ((ElasticsearchMetaAlertDao) metaDao).getPageSize(); - ((ElasticsearchMetaAlertDao) metaDao).setPageSize(5); - - { - // Verify searches successfully return more than 10 results - SearchResponse searchResponse0 = metaDao.getAllMetaAlertsForAlert("message_0"); - List<SearchResult> searchResults0 = searchResponse0.getResults(); - Assert.assertEquals(13, searchResults0.size()); - Set<Map<String, Object>> resultSet = new HashSet<>(); - Iterables.addAll(resultSet, Iterables.transform(searchResults0, r -> r.getSource())); - StringBuffer reason = new StringBuffer("Unable to find " + metaAlerts.get(0) + "\n"); - reason.append(Joiner.on("\n").join(resultSet)); - Assert.assertTrue(reason.toString(), resultSet.contains(metaAlerts.get(0))); - - // Verify no meta alerts are returned because message_1 was not added to any - SearchResponse searchResponse1 = metaDao.getAllMetaAlertsForAlert("message_1"); - List<SearchResult> searchResults1 = searchResponse1.getResults(); - Assert.assertEquals(0, searchResults1.size()); - - // Verify only the meta alert message_2 was added to is returned - SearchResponse searchResponse2 = metaDao.getAllMetaAlertsForAlert("message_2"); - List<SearchResult> searchResults2 = searchResponse2.getResults(); - Assert.assertEquals(1, searchResults2.size()); - Assert.assertEquals(metaAlerts.get(12), searchResults2.get(0).getSource()); - } - ((ElasticsearchMetaAlertDao) metaDao).setPageSize(previousPageSize); - } - - @Test - public void getAllMetaAlertsForAlertShouldThrowExceptionForEmtpyGuid() throws Exception { - try { - metaDao.getAllMetaAlertsForAlert(""); - Assert.fail("An exception should be thrown for empty guid"); - } catch (InvalidSearchException ise) { - Assert.assertEquals("Guid cannot be empty", ise.getMessage()); - } - } - - @Test - public void shouldCreateMetaAlert() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(3); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("message_2", SENSOR_NAME))); - - { - MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest() {{ - setAlerts(new ArrayList<GetRequest>() {{ - add(new GetRequest("message_1", SENSOR_NAME)); - add(new GetRequest("message_2", SENSOR_NAME, INDEX)); - }}); - setGroups(Collections.singletonList("group")); - }}; - MetaAlertCreateResponse metaAlertCreateResponse = metaDao.createMetaAlert(metaAlertCreateRequest); - { - // Verify metaAlert was created - findCreatedDoc(metaAlertCreateResponse.getGuid(), MetaAlertDao.METAALERT_TYPE); - } - { - // Verify alert 0 was not updated with metaalert field - Document alert = metaDao.getLatest("message_0", SENSOR_NAME); - Assert.assertEquals(4, alert.getDocument().size()); - Assert.assertNull(alert.getDocument().get(METAALERT_FIELD)); - } - { - // Verify alert 1 was properly updated with metaalert field - Document alert = metaDao.getLatest("message_1", SENSOR_NAME); - Assert.assertEquals(5, alert.getDocument().size()); - Assert.assertEquals(1, ((List) alert.getDocument().get(METAALERT_FIELD)).size()); - Assert.assertEquals(metaAlertCreateResponse.getGuid(), ((List) alert.getDocument().get(METAALERT_FIELD)).get(0)); - } - { - // Verify alert 2 was properly updated with metaalert field - Document alert = metaDao.getLatest("message_2", SENSOR_NAME); - Assert.assertEquals(5, alert.getDocument().size()); - Assert.assertEquals(1, ((List) alert.getDocument().get(METAALERT_FIELD)).size()); - Assert.assertEquals(metaAlertCreateResponse.getGuid(), ((List) alert.getDocument().get(METAALERT_FIELD)).get(0)); - } - } - } - - @Test - public void shouldAddAlertsToMetaAlert() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(4); - alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlert - Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("message_2", SENSOR_NAME), - new GetRequest("message_3", SENSOR_NAME), - new GetRequest("meta_alert", METAALERT_TYPE))); - - // Build expected metaAlert after alerts are added - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - - // Verify the proper alerts were added - List<Map<String, Object>> metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD)); - Map<String, Object> expectedAlert0 = alerts.get(0); - Map<String, Object> expectedAlert1 = alerts.get(1); - expectedAlert1.put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - metaAlertAlerts.add(expectedAlert1); - Map<String, Object> expectedAlert2 = alerts.get(2); - expectedAlert2.put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - metaAlertAlerts.add(expectedAlert2); - expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts); - - // Verify the counts were properly updated - expectedMetaAlert.put("average", 1.0d); - expectedMetaAlert.put("min", 0.0d); - expectedMetaAlert.put("median", 1.0d); - expectedMetaAlert.put("max", 2.0d); - expectedMetaAlert.put("count", 3); - expectedMetaAlert.put("sum", 3.0d); - expectedMetaAlert.put("threat:triage:score", 3.0d); - - { - // Verify alerts were successfully added to the meta alert - Assert.assertTrue(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_1", SENSOR_NAME), new GetRequest("message_2", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify False when alerts are already in a meta alert and no new alerts are added - Assert.assertFalse(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify only 1 alert is added when a list of alerts only contains 1 alert that is not in the meta alert - metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD)); - Map<String, Object> expectedAlert3 = alerts.get(3); - expectedAlert3.put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - metaAlertAlerts.add(expectedAlert3); - expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts); - - expectedMetaAlert.put("average", 1.5d); - expectedMetaAlert.put("min", 0.0d); - expectedMetaAlert.put("median", 1.5d); - expectedMetaAlert.put("max", 3.0d); - expectedMetaAlert.put("count", 4); - expectedMetaAlert.put("sum", 6.0d); - expectedMetaAlert.put("threat:triage:score", 6.0d); - - Assert.assertTrue(metaDao.addAlertsToMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_2", SENSOR_NAME), new GetRequest("message_3", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - } - - @Test - public void shouldRemoveAlertsFromMetaAlert() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(4); - alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - alerts.get(2).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - alerts.get(3).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlert - Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, - Optional.of(Arrays.asList(alerts.get(0), alerts.get(1), alerts.get(2), alerts.get(3)))); - elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("message_2", SENSOR_NAME), - new GetRequest("message_3", SENSOR_NAME), - new GetRequest("meta_alert", METAALERT_TYPE))); - - // Build expected metaAlert after alerts are added - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - - // Verify the proper alerts were added - List<Map<String, Object>> metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD)); - metaAlertAlerts.remove(0); - metaAlertAlerts.remove(0); - expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts); - - // Verify the counts were properly updated - expectedMetaAlert.put("average", 2.5d); - expectedMetaAlert.put("min", 2.0d); - expectedMetaAlert.put("median", 2.5d); - expectedMetaAlert.put("max", 3.0d); - expectedMetaAlert.put("count", 2); - expectedMetaAlert.put("sum", 5.0d); - expectedMetaAlert.put("threat:triage:score", 5.0d); - - - { - // Verify a list of alerts are removed from a meta alert - Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify False when alerts are not present in a meta alert and no alerts are removed - Assert.assertFalse(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_1", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify only 1 alert is removed when a list of alerts only contains 1 alert that is in the meta alert - metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD)); - metaAlertAlerts.remove(0); - expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts); - - expectedMetaAlert.put("average", 3.0d); - expectedMetaAlert.put("min", 3.0d); - expectedMetaAlert.put("median", 3.0d); - expectedMetaAlert.put("max", 3.0d); - expectedMetaAlert.put("count", 1); - expectedMetaAlert.put("sum", 3.0d); - expectedMetaAlert.put("threat:triage:score", 3.0d); - - Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert", Arrays.asList(new GetRequest("message_0", SENSOR_NAME), new GetRequest("message_2", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify all alerts are removed from a metaAlert - metaAlertAlerts = new ArrayList<>((List<Map<String, Object>>) expectedMetaAlert.get(ALERT_FIELD)); - metaAlertAlerts.remove(0); - expectedMetaAlert.put(ALERT_FIELD, metaAlertAlerts); - - expectedMetaAlert.put("average", 0.0d); - expectedMetaAlert.put("min", "Infinity"); - expectedMetaAlert.put("median", "NaN"); - expectedMetaAlert.put("max", "-Infinity"); - expectedMetaAlert.put("count", 0); - expectedMetaAlert.put("sum", 0.0d); - expectedMetaAlert.put("threat:triage:score", 0.0d); - - Assert.assertTrue(metaDao.removeAlertsFromMetaAlert("meta_alert", - Collections.singletonList(new GetRequest("message_3", SENSOR_NAME)))); - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - } - - @Test - public void addRemoveAlertsShouldThrowExceptionForInactiveMetaAlert() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(2); - alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlert - Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.INACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("meta_alert", METAALERT_TYPE))); - - { - // Verify alerts cannot be added to an INACTIVE meta alert - try { - metaDao.addAlertsToMetaAlert("meta_alert", - Collections.singletonList(new GetRequest("message_1", SENSOR_NAME))); - Assert.fail("Adding alerts to an inactive meta alert should throw an exception"); - } catch (IllegalStateException ise) { - Assert.assertEquals("Adding alerts to an INACTIVE meta alert is not allowed", ise.getMessage()); - } - } - - { - // Verify alerts cannot be removed from an INACTIVE meta alert - try { - metaDao.removeAlertsFromMetaAlert("meta_alert", - Collections.singletonList(new GetRequest("message_0", SENSOR_NAME))); - Assert.fail("Removing alerts from an inactive meta alert should throw an exception"); - } catch (IllegalStateException ise) { - Assert.assertEquals("Removing alerts from an INACTIVE meta alert is not allowed", ise.getMessage()); - } - } - } - - @Test - public void shouldUpdateMetaAlertStatus() throws Exception { - int numChildAlerts = 25; - int numUnrelatedAlerts = 25; - int totalAlerts = numChildAlerts + numUnrelatedAlerts; - - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(totalAlerts); - List<Map<String, Object>> childAlerts = alerts.subList(0, numChildAlerts); - List<Map<String, Object>> unrelatedAlerts = alerts.subList(numChildAlerts, totalAlerts); - for (Map<String, Object> alert : childAlerts) { - alert.put(METAALERT_FIELD, Collections.singletonList("meta_alert")); - } - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlerts - Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, - Optional.of(childAlerts)); - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, - MetaAlertDao.METAALERT_TYPE); - - List<GetRequest> requests = new ArrayList<>(); - for (int i = 0; i < numChildAlerts; ++i) { - requests.add(new GetRequest("message_" + i, SENSOR_NAME)); - } - requests.add(new GetRequest("meta_alert", METAALERT_TYPE)); - - // Verify load was successful - findCreatedDocs(requests); - - { - // Verify status changed to inactive and child alerts are updated - Assert.assertTrue(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.INACTIVE)); - - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - expectedMetaAlert.put(STATUS_FIELD, MetaAlertStatus.INACTIVE.getStatusString()); - - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - - for (int i = 0; i < numChildAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(childAlerts.get(i)); - expectedAlert.put("metaalerts", new ArrayList()); - findUpdatedDoc(expectedAlert, "message_" + i, SENSOR_NAME); - } - - // Ensure unrelated alerts are unaffected - for (int i = 0; i < numUnrelatedAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(unrelatedAlerts.get(i)); - // Make sure to handle the guid offset from creation - findUpdatedDoc(expectedAlert, "message_" + (i + numChildAlerts), SENSOR_NAME); - } - } - - { - // Verify status changed to active and child alerts are updated - Assert.assertTrue(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.ACTIVE)); - - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - expectedMetaAlert.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()); - - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - - for (int i = 0; i < numChildAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(alerts.get(i)); - expectedAlert.put("metaalerts", Collections.singletonList("meta_alert")); - findUpdatedDoc(expectedAlert, "message_" + i, SENSOR_NAME); - } - - // Ensure unrelated alerts are unaffected - for (int i = 0; i < numUnrelatedAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(unrelatedAlerts.get(i)); - // Make sure to handle the guid offset from creation - findUpdatedDoc(expectedAlert, "message_" + (i + numChildAlerts), SENSOR_NAME); - } - - { - // Verify status changed to current status has no effect - Assert.assertFalse(metaDao.updateMetaAlertStatus("meta_alert", MetaAlertStatus.ACTIVE)); - - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - - for (int i = 0; i < numChildAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(alerts.get(i)); - expectedAlert.put("metaalerts", Collections.singletonList("meta_alert")); - findUpdatedDoc(expectedAlert, "message_" + i, SENSOR_NAME); - } - - // Ensure unrelated alerts are unaffected - for (int i = 0; i < numUnrelatedAlerts; ++i) { - Map<String, Object> expectedAlert = new HashMap<>(unrelatedAlerts.get(i)); - // Make sure to handle the guid offset from creation - findUpdatedDoc(expectedAlert, "message_" + (i + numChildAlerts), SENSOR_NAME); - } - } - } - } - - @Test - public void shouldSearchByStatus() throws Exception { - // Load metaAlerts - Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE, - Optional.empty()); - Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE, - Optional.empty()); - - - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("meta_active", METAALERT_TYPE), - new GetRequest("meta_inactive", METAALERT_TYPE))); - - SearchResponse searchResponse = metaDao.search(new SearchRequest() { - { - setQuery("*"); - setIndices(Collections.singletonList(MetaAlertDao.METAALERT_TYPE)); - setFrom(0); - setSize(5); - setSort(Collections.singletonList(new SortField() {{ - setField(Constants.GUID); - }})); - } - }); - - // Verify only active meta alerts are returned - Assert.assertEquals(1, searchResponse.getTotal()); - Assert.assertEquals(MetaAlertStatus.ACTIVE.getStatusString(), - searchResponse.getResults().get(0).getSource().get(MetaAlertDao.STATUS_FIELD)); - } - - @Test + @Override public void shouldSearchByNestedAlert() throws Exception { // Load alerts List<Map<String, Object>> alerts = buildAlerts(4); @@ -701,21 +175,20 @@ public class ElasticsearchMetaAlertIntegrationTest { alerts.get(2).put("ip_src_port", 8008); alerts.get(3).put("ip_src_addr", "192.168.1.4"); alerts.get(3).put("ip_src_port", 8007); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); + addRecords(alerts, INDEX_WITH_SEPARATOR, SENSOR_NAME); // Put the nested type into the test index, so that it'll match appropriately - ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) - .setType("test_doc") - .setSource(nestedAlertMapping) - .get(); + setupTypings(); // Load metaAlerts Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE, Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); - Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE, + Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", + MetaAlertStatus.INACTIVE, Optional.of(Arrays.asList(alerts.get(2), alerts.get(3)))); // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE); + addRecords(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, + METAALERT_TYPE); // Verify load was successful findCreatedDocs(Arrays.asList( @@ -726,12 +199,11 @@ public class ElasticsearchMetaAlertIntegrationTest { new GetRequest("meta_active", METAALERT_TYPE), new GetRequest("meta_inactive", METAALERT_TYPE))); - SearchResponse searchResponse = metaDao.search(new SearchRequest() { { setQuery( "(ip_src_addr:192.168.1.1 AND ip_src_port:8009) OR (alert.ip_src_addr:192.168.1.1 AND alert.ip_src_port:8009)"); - setIndices(Collections.singletonList(MetaAlertDao.METAALERT_TYPE)); + setIndices(Collections.singletonList(METAALERT_TYPE)); setFrom(0); setSize(5); setSort(Collections.singletonList(new SortField() { @@ -751,7 +223,7 @@ public class ElasticsearchMetaAlertIntegrationTest { setQuery( "(ip_src_addr:192.168.1.1 AND ip_src_port:8010)" + " OR (alert.ip_src_addr:192.168.1.1 AND alert.ip_src_port:8010)"); - setIndices(Collections.singletonList("*")); + setIndices(queryIndices); setFrom(0); setSize(5); setSort(Collections.singletonList(new SortField() { @@ -769,12 +241,12 @@ public class ElasticsearchMetaAlertIntegrationTest { // Query against all indices. The child alert has no actual attached meta alerts, and should // be returned on its own. - searchResponse = metaDao.search(new SearchRequest() { + searchResponse = metaDao.search(new SearchRequest() { { setQuery( "(ip_src_addr:192.168.1.3 AND ip_src_port:8008)" + " OR (alert.ip_src_addr:192.168.1.3 AND alert.ip_src_port:8008)"); - setIndices(Collections.singletonList("*")); + setIndices(queryIndices); setFrom(0); setSize(1); setSort(Collections.singletonList(new SortField() { @@ -791,221 +263,13 @@ public class ElasticsearchMetaAlertIntegrationTest { searchResponse.getResults().get(0).getSource().get("guid")); } - @Test - public void shouldHidesAlertsOnGroup() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(2); - alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active")); - alerts.get(0).put("ip_src_addr", "192.168.1.1"); - alerts.get(0).put("score_field", 1); - alerts.get(1).put("ip_src_addr", "192.168.1.1"); - alerts.get(1).put("score_field", 10); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - - // Put the nested type into the test index, so that it'll match appropriately - ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) - .setType("test_doc") - .setSource(nestedAlertMapping) - .get(); - - // Don't need any meta alerts to actually exist, since we've populated the field on the alerts. - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME))); - - // Build our group request - Group searchGroup = new Group(); - searchGroup.setField("ip_src_addr"); - List<Group> groupList = new ArrayList<>(); - groupList.add(searchGroup); - GroupResponse groupResponse = metaDao.group(new GroupRequest() { - { - setQuery("ip_src_addr:192.168.1.1"); - setIndices(Collections.singletonList("*")); - setScoreField("score_field"); - setGroups(groupList); - }}); - - // Should only return the standalone alert in the group - GroupResult result = groupResponse.getGroupResults().get(0); - Assert.assertEquals(1, result.getTotal()); - Assert.assertEquals("192.168.1.1", result.getKey()); - // No delta, since no ops happen - Assert.assertEquals(10.0d, result.getScore(), 0.0d); - } - - @SuppressWarnings("unchecked") - @Test - public void shouldUpdateMetaAlertOnAlertUpdate() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(2); - alerts.get(0).put(METAALERT_FIELD, Arrays.asList("meta_active", "meta_inactive")); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Load metaAlerts - Map<String, Object> activeMetaAlert = buildMetaAlert("meta_active", MetaAlertStatus.ACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - Map<String, Object> inactiveMetaAlert = buildMetaAlert("meta_inactive", MetaAlertStatus.INACTIVE, - Optional.of(Collections.singletonList(alerts.get(0)))); - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(Arrays.asList(activeMetaAlert, inactiveMetaAlert), METAALERTS_INDEX, METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("meta_active", METAALERT_TYPE), - new GetRequest("meta_inactive", METAALERT_TYPE))); - - { - // Modify the first message and add a new field - Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) { - { - put(NEW_FIELD, "metron"); - put(MetaAlertDao.THREAT_FIELD_DEFAULT, "10"); - } - }; - String guid = "" + message0.get(Constants.GUID); - metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty()); - - { - // Verify alerts in ES are up-to-date - findUpdatedDoc(message0, guid, SENSOR_NAME); - long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD)); - if (cnt == 0) { - Assert.fail("Elasticsearch alert not updated!"); - } - } - - { - // Verify meta alerts in ES are up-to-date - long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron"); - if (cnt == 0) { - Assert.fail("Active metaalert was not updated!"); - } - if (cnt != 1) { - Assert.fail("Elasticsearch metaalerts not updated correctly!"); - } - } - } - //modify the same message and modify the new field - { - Map<String, Object> message0 = new HashMap<String, Object>(alerts.get(0)) { - { - put(NEW_FIELD, "metron2"); - } - }; - String guid = "" + message0.get(Constants.GUID); - metaDao.update(new Document(message0, guid, SENSOR_NAME, null), Optional.empty()); - - { - // Verify ES is up-to-date - findUpdatedDoc(message0, guid, SENSOR_NAME); - long cnt = getMatchingAlertCount(NEW_FIELD, message0.get(NEW_FIELD)); - if (cnt == 0) { - Assert.fail("Elasticsearch alert not updated!"); - } - } - { - // Verify meta alerts in ES are up-to-date - long cnt = getMatchingMetaAlertCount(NEW_FIELD, "metron2"); - if (cnt == 0) { - Assert.fail("Active metaalert was not updated!"); - } - if (cnt != 1) { - Assert.fail("Elasticsearch metaalerts not updated correctly!"); - } - } - } - } - - @Test - public void shouldThrowExceptionOnMetaAlertUpdate() throws Exception { - Document metaAlert = new Document(new HashMap<>(), "meta_alert", METAALERT_TYPE, 0L); - try { - // Verify a meta alert cannot be updated in the meta alert dao - metaDao.update(metaAlert, Optional.empty()); - Assert.fail("Direct meta alert update should throw an exception"); - } catch (UnsupportedOperationException uoe) { - Assert.assertEquals("Meta alerts cannot be directly updated", uoe.getMessage()); - } - } - @Test - public void shouldPatchAllowedMetaAlerts() throws Exception { - // Load alerts - List<Map<String, Object>> alerts = buildAlerts(2); - alerts.get(0).put(METAALERT_FIELD, Collections.singletonList("meta_active")); - alerts.get(1).put(METAALERT_FIELD, Collections.singletonList("meta_active")); - elasticsearchAdd(alerts, INDEX, SENSOR_NAME); - - // Put the nested type into the test index, so that it'll match appropriately - ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX) - .setType("test_doc") - .setSource(nestedAlertMapping) - .get(); - - // Load metaAlerts - Map<String, Object> metaAlert = buildMetaAlert("meta_alert", MetaAlertStatus.ACTIVE, - Optional.of(Arrays.asList(alerts.get(0), alerts.get(1)))); - // We pass MetaAlertDao.METAALERT_TYPE, because the "_doc" gets appended automatically. - elasticsearchAdd(Collections.singletonList(metaAlert), METAALERTS_INDEX, MetaAlertDao.METAALERT_TYPE); - - // Verify load was successful - findCreatedDocs(Arrays.asList( - new GetRequest("message_0", SENSOR_NAME), - new GetRequest("message_1", SENSOR_NAME), - new GetRequest("meta_alert", METAALERT_TYPE))); - - Map<String, Object> expectedMetaAlert = new HashMap<>(metaAlert); - expectedMetaAlert.put(NAME_FIELD, "New Meta Alert"); - { - // Verify a patch to a field other than "status" or "alert" can be patched - PatchRequest patchRequest = JSONUtils.INSTANCE.load(namePatchRequest, PatchRequest.class); - metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis())); - - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify a patch to an alert field should throw an exception - try { - PatchRequest patchRequest = JSONUtils.INSTANCE.load(alertPatchRequest, PatchRequest.class); - metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis())); - - Assert.fail("A patch on the alert field should throw an exception"); - } catch (IllegalArgumentException iae) { - Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " - + "Please use the add/remove alert or update status functions instead.", iae.getMessage()); - } - - // Verify the metaAlert was not updated - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - - { - // Verify a patch to a status field should throw an exception - try { - PatchRequest patchRequest = JSONUtils.INSTANCE.load(statusPatchRequest, PatchRequest.class); - metaDao.patch(patchRequest, Optional.of(System.currentTimeMillis())); - - Assert.fail("A patch on the status field should throw an exception"); - } catch (IllegalArgumentException iae) { - Assert.assertEquals("Meta alert patches are not allowed for /alert or /status paths. " - + "Please use the add/remove alert or update status functions instead.", iae.getMessage()); - } - - // Verify the metaAlert was not updated - findUpdatedDoc(expectedMetaAlert, "meta_alert", METAALERT_TYPE); - } - } - - protected long getMatchingAlertCount(String fieldName, Object fieldValue) throws IOException, InterruptedException { + @Override + protected long getMatchingAlertCount(String fieldName, Object fieldValue) + throws IOException, InterruptedException { long cnt = 0; for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { - List<Map<String, Object>> docs = es.getAllIndexedDocs(INDEX, SENSOR_NAME + "_doc"); + List<Map<String, Object>> docs = es + .getAllIndexedDocs(INDEX_WITH_SEPARATOR, SENSOR_NAME + "_doc"); cnt = docs .stream() .filter(d -> { @@ -1016,15 +280,19 @@ public class ElasticsearchMetaAlertIntegrationTest { return cnt; } - protected long getMatchingMetaAlertCount(String fieldName, String fieldValue) throws IOException, InterruptedException { + @Override + protected long getMatchingMetaAlertCount(String fieldName, String fieldValue) + throws IOException, InterruptedException { long cnt = 0; for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { - List<Map<String, Object>> docs = es.getAllIndexedDocs(METAALERTS_INDEX, MetaAlertDao.METAALERT_DOC); + List<Map<String, Object>> docs = es + .getAllIndexedDocs(METAALERTS_INDEX, METAALERT_DOC); cnt = docs .stream() .filter(d -> { + @SuppressWarnings("unchecked") List<Map<String, Object>> alerts = (List<Map<String, Object>>) d - .get(MetaAlertDao.ALERT_FIELD); + .get(ALERT_FIELD); for (Map<String, Object> alert : alerts) { Object newField = alert.get(fieldName); @@ -1039,90 +307,60 @@ public class ElasticsearchMetaAlertIntegrationTest { return cnt; } - protected void findUpdatedDoc(Map<String, Object> message0, String guid, String sensorType) - throws InterruptedException, IOException, OriginalNotFoundException { - for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { - Document doc = metaDao.getLatest(guid, sensorType); - if (doc != null && message0.equals(doc.getDocument())) { - return; - } - } - throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + " tries"); + @Override + protected void addRecords(List<Map<String, Object>> inputData, String index, String docType) + throws IOException { + es.add(index, docType, inputData.stream().map(m -> { + try { + return JSONUtils.INSTANCE.toJSON(m, true); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + ).collect(Collectors.toList()) + ); } - protected boolean findCreatedDoc(String guid, String sensorType) - throws InterruptedException, IOException, OriginalNotFoundException { - for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { - Document doc = metaDao.getLatest(guid, sensorType); - if (doc != null) { - return true; - } - } - throw new OriginalNotFoundException("Count not find " + guid + " after " + MAX_RETRIES + "tries"); + @Override + protected void setupTypings() { + ((ElasticsearchDao) esDao).getClient().admin().indices().preparePutMapping(INDEX_WITH_SEPARATOR) + .setType("test_doc") + .setSource(nestedAlertMapping) + .get(); } - protected boolean findCreatedDocs(List<GetRequest> getRequests) - throws InterruptedException, IOException, OriginalNotFoundException { - for (int t = 0; t < MAX_RETRIES; ++t, Thread.sleep(SLEEP_MS)) { - Iterable<Document> docs = metaDao.getAllLatest(getRequests); - if (docs != null) { - int docCount = 0; - for (Document doc: docs) { - docCount++; - } - if (getRequests.size() == docCount) { - return true; - } - } - } - throw new OriginalNotFoundException("Count not find guids after " + MAX_RETRIES + "tries"); + @Override + protected String getTestIndexName() { + return INDEX; } - protected List<Map<String, Object>> buildAlerts(int count) { - List<Map<String, Object>> inputData = new ArrayList<>(); - for (int i = 0; i < count; ++i) { - final String guid = "message_" + i; - Map<String, Object> alerts = new HashMap<>(); - alerts.put(Constants.GUID, guid); - alerts.put("source:type", SENSOR_NAME); - alerts.put(MetaAlertDao.THREAT_FIELD_DEFAULT, i); - alerts.put("timestamp", System.currentTimeMillis()); - inputData.add(alerts); - } - return inputData; + @Override + protected String getTestIndexFullName() { + return INDEX_WITH_SEPARATOR; } - protected List<Map<String, Object>> buildMetaAlerts(int count, MetaAlertStatus status, Optional<List<Map<String, Object>>> alerts) { - List<Map<String, Object>> inputData = new ArrayList<>(); - for (int i = 0; i < count; ++i) { - final String guid = "meta_" + status.getStatusString() + "_" + i; - inputData.add(buildMetaAlert(guid, status, alerts)); - } - return inputData; + @Override + protected String getMetaAlertIndex() { + return METAALERTS_INDEX; } - protected Map<String, Object> buildMetaAlert(String guid, MetaAlertStatus status, Optional<List<Map<String, Object>>> alerts) { - Map<String, Object> metaAlert = new HashMap<>(); - metaAlert.put(Constants.GUID, guid); - metaAlert.put("source:type", METAALERT_TYPE); - metaAlert.put(MetaAlertDao.STATUS_FIELD, status.getStatusString()); - if (alerts.isPresent()) { - List<Map<String, Object>> alertsList = alerts.get(); - metaAlert.put(ALERT_FIELD, alertsList); - } - return metaAlert; + @Override + protected String getSourceTypeField() { + return ElasticsearchMetaAlertDao.SOURCE_TYPE_FIELD; } - protected void elasticsearchAdd(List<Map<String, Object>> inputData, String index, String docType) - throws IOException { - es.add(index, docType, inputData.stream().map(m -> { - try { - return JSONUtils.INSTANCE.toJSON(m, true); - } catch (JsonProcessingException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - ).collect(Collectors.toList()) - ); + @Override + protected void setEmptiedMetaAlertField(Map<String, Object> docMap) { + docMap.put(METAALERT_FIELD, new ArrayList<>()); + } + + @Override + protected boolean isFiniteDoubleOnly() { + return true; + } + + @Override + protected boolean isEmptyMetaAlertList() { + return true; } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index bb28abb..6f76093 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -22,12 +22,8 @@ import java.io.File; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ExecutionException; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; @@ -36,11 +32,8 @@ import org.apache.metron.elasticsearch.integration.components.ElasticSearchCompo import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; -import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GroupRequest; -import org.apache.metron.indexing.dao.search.GroupResponse; -import org.apache.metron.indexing.dao.search.GroupResult; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -54,13 +47,8 @@ import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; -import org.junit.Test; - -import java.io.File; -import java.io.IOException; -import java.util.HashMap; -import java.util.concurrent.ExecutionException; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.Test; public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { @@ -69,6 +57,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { private static String dateFormat = "yyyy.MM.dd.HH"; private static final int MAX_RETRIES = 10; private static final int SLEEP_MS = 500; + protected static IndexDao dao; /** * { @@ -196,8 +185,15 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { @Multiline private static String broDefaultStringMappings; - @Override - protected IndexDao createDao() throws Exception { + @BeforeClass + public static void setup() throws Exception { + indexComponent = startIndex(); + dao = createDao(); + // The data is all static for searches, so we can set it up beforehand, and it's faster + loadTestData(); + } + + protected static IndexDao createDao() { AccessConfig config = new AccessConfig(); config.setMaxSearchResults(100); config.setMaxSearchGroups(100); @@ -215,8 +211,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { return dao; } - @Override - protected InMemoryComponent startIndex() throws Exception { + protected static InMemoryComponent startIndex() throws Exception { InMemoryComponent es = new ElasticSearchComponent.Builder() .withHttpPort(9211) .withIndexDir(new File(indexDir)) @@ -225,32 +220,36 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { return es; } - @Override - protected void loadTestData() - throws ParseException, IOException, ExecutionException, InterruptedException { - ElasticSearchComponent es = (ElasticSearchComponent)indexComponent; + protected static void loadTestData() throws ParseException { + ElasticSearchComponent es = (ElasticSearchComponent) indexComponent; es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") - .addMapping("bro_doc", broTypeMappings).addMapping("bro_doc_default", broDefaultStringMappings).get(); + .addMapping("bro_doc", broTypeMappings) + .addMapping("bro_doc_default", broDefaultStringMappings).get(); es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02") - .addMapping("snort_doc", snortTypeMappings).get(); + .addMapping("snort_doc", snortTypeMappings).get(); - BulkRequestBuilder bulkRequest = es.getClient().prepareBulk().setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + BulkRequestBuilder bulkRequest = es.getClient().prepareBulk() + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); JSONArray broArray = (JSONArray) new JSONParser().parse(broData); - for(Object o: broArray) { + for (Object o : broArray) { JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc"); + IndexRequestBuilder indexRequestBuilder = es.getClient() + .prepareIndex("bro_index_2017.01.01.01", "bro_doc"); indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid")); indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); + indexRequestBuilder = indexRequestBuilder + .setTimestamp(jsonObject.get("timestamp").toString()); bulkRequest.add(indexRequestBuilder); } JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData); - for(Object o: snortArray) { + for (Object o : snortArray) { JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc"); + IndexRequestBuilder indexRequestBuilder = es.getClient() + .prepareIndex("snort_index_2017.01.01.02", "snort_doc"); indexRequestBuilder = indexRequestBuilder.setId((String) jsonObject.get("guid")); indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); + indexRequestBuilder = indexRequestBuilder + .setTimestamp(jsonObject.get("timestamp").toString()); bulkRequest.add(indexRequestBuilder); } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); @@ -357,4 +356,9 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { protected String getSourceTypeField() { return Constants.SENSOR_TYPE.replace('.', ':'); } + + @Override + protected IndexDao getIndexDao() { + return dao; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index 0080d75..97993ff 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -20,17 +20,29 @@ package org.apache.metron.elasticsearch.integration; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; import java.io.File; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.HBaseDao; import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.MultiIndexDao; import org.apache.metron.indexing.dao.UpdateIntegrationTest; -import org.apache.metron.integration.InMemoryComponent; +import org.apache.metron.integration.UnableToStartException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static final String SENSOR_NAME= "test"; @@ -39,13 +51,56 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); private static ElasticSearchComponent es; + private static final String TABLE_NAME = "modifications"; + private static final String CF = "p"; + private static MockHTable table; + private static IndexDao hbaseDao; + @Override protected String getIndexName() { return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); } - @Override - protected Map<String, Object> createGlobalConfig() throws Exception { + @BeforeClass + public static void setupBeforeClass() throws UnableToStartException { + es = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(indexDir)) + .build(); + es.start(); + } + + @Before + public void setup() throws IOException { + Configuration config = HBaseConfiguration.create(); + MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); + MockHBaseTableProvider.addToCache(TABLE_NAME, CF); + table = (MockHTable) tableProvider.getTable(config, TABLE_NAME); + + hbaseDao = new HBaseDao(); + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setTableProvider(tableProvider); + Map<String, Object> globalConfig = createGlobalConfig(); + globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME); + globalConfig.put(HBaseDao.HBASE_CF, CF); + accessConfig.setGlobalConfigSupplier(() -> globalConfig); + + dao = new MultiIndexDao(hbaseDao, createDao()); + dao.init(accessConfig); + } + + @After + public void reset() { + es.reset(); + table.clear(); + } + + @AfterClass + public static void teardown() { + es.stop(); + } + + protected static Map<String, Object> createGlobalConfig() { return new HashMap<String, Object>() {{ put("es.clustername", "metron"); put("es.port", "9300"); @@ -54,27 +109,11 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { }}; } - @Override - protected IndexDao createDao() throws Exception { + protected static IndexDao createDao() { return new ElasticsearchDao(); } @Override - protected InMemoryComponent startIndex() throws Exception { - es = new ElasticSearchComponent.Builder() - .withHttpPort(9211) - .withIndexDir(new File(indexDir)) - .build(); - es.start(); - return es; - } - - @Override - protected void loadTestData() throws Exception { - - } - - @Override protected void addTestData(String indexName, String sensorType, List<Map<String, Object>> docs) throws Exception { es.add(index, SENSOR_NAME @@ -94,4 +133,9 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { protected List<Map<String, Object>> getIndexedTestData(String indexName, String sensorType) throws Exception { return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); } + + @Override + protected MockHTable getMockHTable() { + return table; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java index e716ce1..45b4d60 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java @@ -274,19 +274,19 @@ public class ElasticSearchComponent implements InMemoryComponent { } - @Override - public void stop() { - try { - node.close(); - } catch (IOException e) { - throw new RuntimeException("Unable to stop node." , e); - } - node = null; - client = null; + @Override + public void stop() { + try { + node.close(); + } catch (IOException e) { + throw new RuntimeException("Unable to stop node." , e); } + node = null; + client = null; + } - @Override - public void reset() { - client.admin().indices().delete(new DeleteIndexRequest("*")).actionGet(); - } + @Override + public void reset() { + client.admin().indices().delete(new DeleteIndexRequest("*")).actionGet(); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-indexing/README.md ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/README.md b/metron-platform/metron-indexing/README.md index f4a4501..7a2ec29 100644 --- a/metron-platform/metron-indexing/README.md +++ b/metron-platform/metron-indexing/README.md @@ -194,7 +194,7 @@ The HBase column family to use for message updates. ### The `MetaAlertDao` The goal of meta alerts is to be able to group together a set of alerts while being able to transparently perform actions -like searches, as if meta alerts were normal alerts. `org.apache.metron.indexing.dao.MetaAlertDao` extends `IndexDao` and +like searches, as if meta alerts were normal alerts. `org.apache.metron.indexing.dao.metaalert.MetaAlertDao` extends `IndexDao` and enables several features: * the ability to get all meta alerts associated with an alert * creation of a meta alert http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-indexing/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/pom.xml b/metron-platform/metron-indexing/pom.xml index e7164e7..8561368 100644 --- a/metron-platform/metron-indexing/pom.xml +++ b/metron-platform/metron-indexing/pom.xml @@ -143,7 +143,7 @@ </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> + <artifactId>mockito-core</artifactId> <version>${global_mockito_version}</version> <scope>test</scope> </dependency> @@ -197,6 +197,12 @@ <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> <plugins> http://git-wip-us.apache.org/repos/asf/metron/blob/49f851e0/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java index c301050..b1df46a 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/AccessConfig.java @@ -17,6 +17,7 @@ */ package org.apache.metron.indexing.dao; +import java.util.function.Function; import org.apache.metron.hbase.TableProvider; import java.util.HashMap; @@ -27,6 +28,7 @@ public class AccessConfig { private Integer maxSearchResults; private Integer maxSearchGroups; private Supplier<Map<String, Object>> globalConfigSupplier; + private Function<String, String> indexSupplier; private Map<String, String> optionalSettings = new HashMap<>(); private TableProvider tableProvider = null; private Boolean isKerberosEnabled = false; @@ -42,6 +44,14 @@ public class AccessConfig { this.globalConfigSupplier = globalConfigSupplier; } + public Function<String, String> getIndexSupplier() { + return indexSupplier; + } + + public void setIndexSupplier(Function<String, String> indexSupplier) { + this.indexSupplier = indexSupplier; + } + /** * @return The maximum number of search results. */
