METRON-1289 Alert fields are lost when a MetaAlert is created (merrimanr) closes apache/metron#824
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/fd896fbe Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/fd896fbe Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/fd896fbe Branch: refs/heads/master Commit: fd896fbebe9d5e77eb11d1ce953ab2b55cc84387 Parents: c4c930f Author: merrimanr <[email protected]> Authored: Wed Nov 15 19:35:18 2017 -0600 Committer: merrimanr <[email protected]> Committed: Wed Nov 15 19:35:18 2017 -0600 ---------------------------------------------------------------------- metron-interface/metron-rest/README.md | 36 +- .../apache/metron/rest/config/IndexConfig.java | 3 +- .../rest/controller/MetaAlertController.java | 48 +- .../metron/rest/service/MetaAlertService.java | 10 + .../rest/service/impl/MetaAlertServiceImpl.java | 31 + .../rest/service/impl/SearchServiceImpl.java | 18 +- .../MetaAlertControllerIntegrationTest.java | 120 +- .../UpdateControllerIntegrationTest.java | 5 +- .../org/apache/metron/common/utils/KeyUtil.java | 50 + .../hbase/HBaseEnrichmentConverterTest.java | 21 + .../elasticsearch/dao/ElasticsearchDao.java | 115 +- .../dao/ElasticsearchMetaAlertDao.java | 717 +++++----- .../elasticsearch/dao/MetaAlertStatus.java | 34 - .../dao/ElasticsearchMetaAlertDaoTest.java | 304 +--- .../ElasticsearchMetaAlertIntegrationTest.java | 1301 ++++++++++-------- .../ElasticsearchUpdateIntegrationTest.java | 4 +- .../enrichment/converter/EnrichmentKey.java | 23 +- metron-platform/metron-indexing/README.md | 17 +- metron-platform/metron-indexing/pom.xml | 7 + .../apache/metron/indexing/dao/HBaseDao.java | 128 +- .../apache/metron/indexing/dao/IndexDao.java | 38 +- .../metron/indexing/dao/MetaAlertDao.java | 91 +- .../metron/indexing/dao/MultiIndexDao.java | 54 + .../metaalert/MetaAlertAddRemoveRequest.java | 41 + .../dao/metaalert/MetaAlertCreateRequest.java | 14 +- .../indexing/dao/metaalert/MetaAlertStatus.java | 34 + .../metron/indexing/dao/search/GetRequest.java | 35 +- .../apache/metron/indexing/dao/InMemoryDao.java | 25 +- .../indexing/dao/InMemoryMetaAlertDao.java | 96 +- .../indexing/dao/SearchIntegrationTest.java | 32 +- .../integration/HBaseDaoIntegrationTest.java | 164 +++ 31 files changed, 2219 insertions(+), 1397 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index b79b44d..724239b 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -218,6 +218,9 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)| | [ `GET /api/v1/metaalert/searchByAlert`](#get-apiv1metaalertsearchbyalert)| | [ `GET /api/v1/metaalert/create`](#get-apiv1metaalertcreate)| +| [ `GET /api/v1/metaalert/add/alert`](#get-apiv1metaalertaddalert)| +| [ `GET /api/v1/metaalert/remove/alert`](#get-apiv1metaalertremovealert)| +| [ `GET /api/v1/metaalert/update/status/{guid}/{status}`](#get-apiv1metaalertupdatestatusguidstatus)| | [ `GET /api/v1/search/search`](#get-apiv1searchsearch)| | [ `POST /api/v1/search/search`](#get-apiv1searchsearch)| | [ `POST /api/v1/search/group`](#get-apiv1searchgroup)| @@ -415,19 +418,40 @@ Request and Response objects are JSON formatted. The JSON schemas are available * 404 - Either Kafka topic is missing or contains no messages ### `POST /api/v1/metaalert/searchByAlert` - * Description: Searches meta alerts to find any containing an alert for the provided GUID + * Description: Get all meta alerts that contain an alert. * Input: * guid - GUID of the alert * Returns: - * 200 - Returns the meta alerts associated with this alert - * 404 - The child alert isn't found + * 200 - Search results ### `POST /api/v1/metaalert/create` - * Description: Creates a meta alert containing the provide alerts + * Description: Creates a new meta alert from a list of existing alerts. The meta alert status will initially be set to 'ACTIVE' and summary statistics will be computed from the list of alerts. A list of groups included in the request are also added to the meta alert. * Input: - * request - Meta Alert Create Request + * request - Meta alert create request which includes a list of alert get requests and a list of custom groups used to annotate a meta alert. * Returns: - * 200 - The meta alert was created + * 200 - The GUID of the new meta alert + +### `POST /api/v1/metaalert/add/alert` + * Description: Adds an alert to an existing meta alert. An alert will not be added if it is already contained in a meta alert. + * Input: + * request - Meta alert add request which includes a meta alert GUID and list of alert get requests + * Returns: + * 200 - Returns 'true' if the alert was added and 'false' if the meta alert did not change. + +### `POST /api/v1/metaalert/remove/alert` + * Description: Removes an alert from an existing meta alert. If the alert to be removed is not in a meta alert, 'false' will be returned. + * Input: + * request - Meta alert remove request which includes a meta alert GUID and list of alert get requests + * Returns: + * 200 - Returns 'true' if the alert was removed and 'false' if the meta alert did not change. + +### `POST /api/v1/metaalert/update/status/{guid}/{status}` + * Description: Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'. + * Input: + * guid - Meta alert GUID + * status - Meta alert status with a value of either 'ACTIVE' or 'INACTIVE' + * Returns: + * 200 - Returns 'true' if the status changed and 'false' if it did not. ### `POST /api/v1/search/search` * Description: Searches the indexing store. GUIDs must be quoted to ensure correct results. http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java index 8eabb2e..4ce9644 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java @@ -19,6 +19,7 @@ package org.apache.metron.rest.config; import static org.apache.metron.rest.MetronRestConstants.INDEX_DAO_IMPL; +import java.util.Optional; import org.apache.metron.hbase.HTableProvider; import org.apache.metron.hbase.TableProvider; import org.apache.metron.indexing.dao.AccessConfig; @@ -81,7 +82,7 @@ public class IndexConfig { // Create the meta alert dao and wrap it around the index dao. MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0); - ret.init(indexDao, metaDaoSort); + ret.init(indexDao, Optional.ofNullable(metaDaoSort)); return ret; } catch(RuntimeException re) { http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java index e9cff8b..d42403a 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java @@ -21,6 +21,8 @@ package org.apache.metron.rest.controller; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; import io.swagger.annotations.ApiResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; +import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; import org.apache.metron.indexing.dao.search.SearchResponse; @@ -29,6 +31,7 @@ import org.apache.metron.rest.service.MetaAlertService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -41,24 +44,59 @@ public class MetaAlertController { @Autowired private MetaAlertService metaAlertService; - @ApiOperation(value = "Get all meta alerts for alert") + @ApiOperation(value = "Get all meta alerts that contain an alert.") @ApiResponse(message = "Search results", code = 200) @RequestMapping(value = "/searchByAlert", method = RequestMethod.POST) ResponseEntity<SearchResponse> searchByAlert( - @ApiParam(name = "guid", value = "GUID", required = true) + @ApiParam(name = "guid", value = "Alert GUID", required = true) @RequestBody final String guid ) throws RestException { return new ResponseEntity<>(metaAlertService.getAllMetaAlertsForAlert(guid), HttpStatus.OK); } - @ApiOperation(value = "Create a meta alert") - @ApiResponse(message = "Created meta alert", code = 200) + @ApiOperation(value = "Creates a new meta alert from a list of existing alerts. " + + "The meta alert status will initially be set to 'ACTIVE' and summary statistics " + + "will be computed from the list of alerts. A list of groups included in the request are also added to the meta alert.") + @ApiResponse(message = "The GUID of the new meta alert", code = 200) @RequestMapping(value = "/create", method = RequestMethod.POST) ResponseEntity<MetaAlertCreateResponse> create( - @ApiParam(name = "request", value = "Meta Alert Create Request", required = true) + @ApiParam(name = "createRequest", value = "Meta alert create request which includes a list of alert " + + "get requests and a list of custom groups used to annotate a meta alert", required = true) @RequestBody final MetaAlertCreateRequest createRequest ) throws RestException { return new ResponseEntity<>(metaAlertService.create(createRequest), HttpStatus.OK); } + + @ApiOperation(value = "Adds an alert to an existing meta alert. An alert will not be added if it is already contained in a meta alert.") + @ApiResponse(message = "Returns 'true' if the alert was added and 'false' if the meta alert did not change.", code = 200) + @RequestMapping(value = "/add/alert", method = RequestMethod.POST) + ResponseEntity<Boolean> addAlertsToMetaAlert( + @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert add request which includes a meta alert GUID and list of alert get requests", required = true) + @RequestBody final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest + ) throws RestException { + return new ResponseEntity<>(metaAlertService.addAlertsToMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK); + } + + @ApiOperation(value = "Removes an alert from an existing meta alert. If the alert to be removed is not in a meta alert, 'false' will be returned.") + @ApiResponse(message = "Returns 'true' if the alert was removed and 'false' if the meta alert did not change.", code = 200) + @RequestMapping(value = "/remove/alert", method = RequestMethod.POST) + ResponseEntity<Boolean> removeAlertsFromMetaAlert( + @ApiParam(name = "metaAlertAddRemoveRequest", value = "Meta alert remove request which includes a meta alert GUID and list of alert get requests", required = true) + @RequestBody final MetaAlertAddRemoveRequest metaAlertAddRemoveRequest + ) throws RestException { + return new ResponseEntity<>(metaAlertService.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest), HttpStatus.OK); + } + + @ApiOperation(value = "Updates the status of a meta alert to either 'ACTIVE' or 'INACTIVE'.") + @ApiResponse(message = "Returns 'true' if the status changed and 'false' if it did not.", code = 200) + @RequestMapping(value = "/update/status/{guid}/{status}", method = RequestMethod.POST) + ResponseEntity<Boolean> updateMetaAlertStatus( + final @ApiParam(name = "guid", value = "Meta alert GUID", required = true) + @PathVariable String guid, + final @ApiParam(name = "status", value = "Meta alert status with a value of either 'ACTIVE' or 'INACTIVE'", required = true) + @PathVariable String status) throws RestException { + return new ResponseEntity<>(metaAlertService.updateMetaAlertStatus(guid, + MetaAlertStatus.valueOf(status.toUpperCase())), HttpStatus.OK); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java index c339506..e8abaf3 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java @@ -18,8 +18,12 @@ package org.apache.metron.rest.service; +import java.io.IOException; +import java.util.Collection; +import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.rest.RestException; @@ -28,4 +32,10 @@ public interface MetaAlertService { MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException; SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException; + + boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException; + + boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException; + + boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws RestException; } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java index f120c9e..aafab24 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java @@ -19,10 +19,13 @@ package org.apache.metron.rest.service.impl; import java.io.IOException; +import java.util.Collection; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; @@ -63,4 +66,32 @@ public class MetaAlertServiceImpl implements MetaAlertService { throw new RestException(ise.getMessage(), ise); } } + + @Override + public boolean addAlertsToMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException { + try { + return dao.addAlertsToMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts()); + } catch (IOException ioe) { + throw new RestException(ioe.getMessage(), ioe); + } + } + + @Override + public boolean removeAlertsFromMetaAlert(MetaAlertAddRemoveRequest metaAlertAddRemoveRequest) throws RestException { + try { + return dao.removeAlertsFromMetaAlert(metaAlertAddRemoveRequest.getMetaAlertGuid(), metaAlertAddRemoveRequest.getAlerts()); + } catch (IOException ioe) { + throw new RestException(ioe.getMessage(), ioe); + } + } + + @Override + public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) + throws RestException { + try { + return dao.updateMetaAlertStatus(metaAlertGuid, status); + } catch (IOException ioe) { + throw new RestException(ioe.getMessage(), ioe); + } + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java index efd80a7..433eae3 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java @@ -58,13 +58,10 @@ public class SearchServiceImpl implements SearchService { @Override public SearchResponse search(SearchRequest searchRequest) throws RestException { try { - // Pull the indices from the cache by default if (searchRequest.getIndices() == null || searchRequest.getIndices().isEmpty()) { - List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME)))); - // metaalerts should be included by default + List<String> indices = getDefaultIndices(); + // metaalerts should be included by default in search requests indices.add(METAALERT_TYPE); - // errors should not be included by default - indices.remove(ERROR_TYPE); searchRequest.setIndices(indices); } return dao.search(searchRequest); @@ -77,6 +74,9 @@ public class SearchServiceImpl implements SearchService { @Override public GroupResponse group(GroupRequest groupRequest) throws RestException { try { + if (groupRequest.getIndices() == null || groupRequest.getIndices().isEmpty()) { + groupRequest.setIndices(getDefaultIndices()); + } return dao.group(groupRequest); } catch(InvalidSearchException ise) { @@ -112,4 +112,12 @@ public class SearchServiceImpl implements SearchService { throw new RestException(ioe.getMessage(), ioe); } } + + private List<String> getDefaultIndices() throws RestException { + // Pull the indices from the cache by default + List<String> indices = Lists.newArrayList((sensorIndexingConfigService.getAllIndices(environment.getProperty(INDEX_WRITER_NAME)))); + // errors should not be included by default + indices.remove(ERROR_TYPE); + return indices; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java index 983c207..b0dd774 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java @@ -28,11 +28,22 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.InMemoryMetaAlertDao; import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertAddRemoveRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.rest.service.MetaAlertService; +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,10 +78,18 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest { /** { - "guidToIndices" : { - "bro_1":"bro_index_2017.01.01.01", - "snort_2":"snort_index_2017.01.01.01" + "alerts" : [ + { + "guid": "bro_1", + "sensorType": "bro", + "index": "bro_index_2017.01.01.01" }, + { + "guid": "snort_2", + "sensorType": "snort", + "index": "snort_index_2017.01.01.01" + } + ], "groups" : ["group_one", "group_two"] } */ @@ -88,6 +107,11 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest { loadTestData(testData); } + @After + public void cleanup() { + InMemoryMetaAlertDao.clear(); + } + @Test public void test() throws Exception { // Testing searching by alert @@ -171,4 +195,94 @@ public class MetaAlertControllerIntegrationTest extends DaoControllerTest { .andExpect(jsonPath("$.results[0].source.guid").value("meta_3")) .andExpect(jsonPath("$.results[0].source.count").value(2.0)); } + + @Test + public void shouldAddRemoveAlerts() throws Exception { + MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest(); + metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two")); + metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01")); + add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01")); + }}); + MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest); + + MetaAlertAddRemoveRequest addRequest = new MetaAlertAddRemoveRequest(); + addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid()); + addRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_2", "bro", "bro_index_2017.01.01.01")); + add(new GetRequest("bro_3", "bro", "bro_index_2017.01.01.01")); + }}); + + ResultActions result = this.mockMvc.perform( + post(metaalertUrl + "/add/alert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")) + .content(JSONUtils.INSTANCE.toJSON(addRequest, false))); + result.andExpect(status().isOk()).andExpect(content().string("true")); + + MetaAlertAddRemoveRequest addDuplicateRequest = new MetaAlertAddRemoveRequest(); + addDuplicateRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid()); + addDuplicateRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_1", "bro")); + }}); + + result = this.mockMvc.perform( + post(metaalertUrl + "/add/alert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")) + .content(JSONUtils.INSTANCE.toJSON(addDuplicateRequest, false))); + result.andExpect(status().isOk()).andExpect(content().string("false")); + + MetaAlertAddRemoveRequest removeRequest = new MetaAlertAddRemoveRequest(); + removeRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid()); + removeRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_2", "bro")); + add(new GetRequest("bro_3", "bro")); + }}); + + result = this.mockMvc.perform( + post(metaalertUrl + "/remove/alert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")) + .content(JSONUtils.INSTANCE.toJSON(removeRequest, false))); + result.andExpect(status().isOk()).andExpect(content().string("true")); + + MetaAlertAddRemoveRequest removeMissingRequest = new MetaAlertAddRemoveRequest(); + addRequest.setMetaAlertGuid(metaAlertCreateResponse.getGuid()); + removeMissingRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_1", "bro")); + }}); + + result = this.mockMvc.perform( + post(metaalertUrl + "/remove/alert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")) + .content(JSONUtils.INSTANCE.toJSON(removeMissingRequest, false))); + result.andExpect(status().isOk()).andExpect(content().string("false")); + } + + @Test + public void shouldUpdateStatus() throws Exception { + MetaAlertCreateRequest metaAlertCreateRequest = new MetaAlertCreateRequest(); + metaAlertCreateRequest.setGroups(Arrays.asList("group_one", "group_two")); + metaAlertCreateRequest.setAlerts(new ArrayList<GetRequest>() {{ + add(new GetRequest("bro_1", "bro", "bro_index_2017.01.01.01")); + add(new GetRequest("snort_2", "snort", "snort_index_2017.01.01.01")); + }}); + + MetaAlertCreateResponse metaAlertCreateResponse = metaAlertService.create(metaAlertCreateRequest); + + ResultActions result = this.mockMvc.perform( + post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))); + result.andExpect(status().isOk()).andExpect(content().string("true")); + + result = this.mockMvc.perform( + post(metaalertUrl + "/update/status/" + metaAlertCreateResponse.getGuid() + "/inactive") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))); + result.andExpect(status().isOk()).andExpect(content().string("false")); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java index 4708bc4..57a1b28 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.indexing.dao.HBaseDao; import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.rest.service.UpdateService; @@ -161,7 +162,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { Assert.assertEquals(1,table.size()); { //ensure hbase is up to date - Get g = new Get(guid.getBytes()); + Get g = new Get(new HBaseDao.Key(guid,"bro").toBytes()); Result r = table.get(g); NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); Assert.assertEquals(1, columns.size()); @@ -183,7 +184,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { Assert.assertEquals(1,table.size()); { //ensure hbase is up to date - Get g = new Get(guid.getBytes()); + Get g = new Get(new HBaseDao.Key(guid, "bro").toBytes()); Result r = table.get(g); NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); Assert.assertEquals(2, columns.size()); http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java new file mode 100644 index 0000000..595a839 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/KeyUtil.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.utils; + +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; + +public enum KeyUtil { + INSTANCE; + private static final int SEED = 0xDEADBEEF; + public static final int HASH_PREFIX_SIZE=16; + ThreadLocal<HashFunction> hFunction= new ThreadLocal<HashFunction>() { + @Override + protected HashFunction initialValue() { + return Hashing.murmur3_128(SEED); + } + }; + + public byte[] getPrefix(byte[] key) { + Hasher hasher = hFunction.get().newHasher(); + hasher.putBytes(key); + return hasher.hash().asBytes(); + } + + public byte[] merge(byte[] prefix, byte[] key) { + byte[] val = new byte[key.length + prefix.length]; + int offset = 0; + System.arraycopy(prefix, 0, val, offset, prefix.length); + offset += prefix.length; + System.arraycopy(key, 0, val, offset, key.length); + return val; + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java index a018e27..fff1d9b 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/hbase/HBaseEnrichmentConverterTest.java @@ -34,6 +34,15 @@ import java.util.HashMap; public class HBaseEnrichmentConverterTest { + public static byte[] keyBytes = new byte[] { + 0x31,(byte)0xc2,0x49,0x05,0x6b,(byte)0xea, + 0x0e,0x59,(byte)0xe1,(byte)0xad,(byte)0xa0,0x24, + 0x55,(byte)0xa9,0x6b,0x63,0x00,0x06, + 0x64,0x6f,0x6d,0x61,0x69,0x6e, + 0x00,0x06,0x67,0x6f,0x6f,0x67, + 0x6c,0x65 + }; + EnrichmentKey key = new EnrichmentKey("domain", "google"); EnrichmentValue value = new EnrichmentValue( new HashMap<String, Object>() {{ @@ -41,6 +50,18 @@ public class HBaseEnrichmentConverterTest { put("grok", "baz"); }}); LookupKV<EnrichmentKey, EnrichmentValue> results = new LookupKV(key, value); + + /** + * IF this test fails then you have broken the key serialization in that your change has + * caused a key to change serialization, so keys from previous releases will not be able to be found + * under your scheme. Please either provide a migration plan or undo this change. DO NOT CHANGE THIS + * TEST BLITHELY! + */ + @Test + public void testKeySerializationRemainsConstant() { + byte[] raw = key.toBytes(); + Assert.assertArrayEquals(raw, keyBytes); + } @Test public void testKeySerialization() { byte[] serialized = key.toBytes(); http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index f114b4c..61d5472 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -25,9 +25,11 @@ import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -38,6 +40,7 @@ import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.Group; import org.apache.metron.indexing.dao.search.GroupOrder; import org.apache.metron.indexing.dao.search.GroupOrderType; @@ -55,10 +58,9 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.common.collect.ImmutableOpenMap; @@ -256,40 +258,73 @@ public class ElasticsearchDao implements IndexDao { return ret.orElse(null); } + @Override + public Iterable<Document> getAllLatest( + final List<GetRequest> getRequests) throws IOException { + Collection<String> guids = new HashSet<>(); + Collection<String> sensorTypes = new HashSet<>(); + for (GetRequest getRequest: getRequests) { + guids.add(getRequest.getGuid()); + sensorTypes.add(getRequest.getSensorType()); + } + List<Document> documents = searchByGuids( + guids + , sensorTypes + , hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, hit.getId(), sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + + ); + return documents; + } + + <T> Optional<T> searchByGuid(String guid, String sensorType, + Function<SearchHit, Optional<T>> callback) { + Collection<String> sensorTypes = sensorType != null ? Collections.singleton(sensorType) : null; + List<T> results = searchByGuids(Collections.singleton(guid), sensorTypes, callback); + if (results.size() > 0) { + return Optional.of(results.get(0)); + } else { + return Optional.empty(); + } + } + /** * Return the search hit based on the UUID and sensor type. * A callback can be specified to transform the hit into a type T. * If more than one hit happens, the first one will be returned. */ - <T> Optional<T> searchByGuid(String guid, String sensorType, + <T> List<T> searchByGuids(Collection<String> guids, Collection<String> sensorTypes, Function<SearchHit, Optional<T>> callback) { QueryBuilder query; - if (sensorType != null) { - query = QueryBuilders.idsQuery(sensorType + "_doc").ids(guid); + if (sensorTypes != null) { + String[] types = sensorTypes.stream().map(sensorType -> sensorType + "_doc").toArray(String[]::new); + query = QueryBuilders.idsQuery(types).ids(guids); } else { - query = QueryBuilders.idsQuery().ids(guid); + query = QueryBuilders.idsQuery().ids(guids); } SearchRequestBuilder request = client.prepareSearch() .setQuery(query) .setSource("message") + .setSize(guids.size()) ; org.elasticsearch.action.search.SearchResponse response = request.get(); SearchHits hits = response.getHits(); - long totalHits = hits.getTotalHits(); - if (totalHits > 1) { - LOG.warn("Encountered {} results for guid {} in sensor {}. Returning first hit.", - totalHits, - guid, - sensorType - ); - } + List<T> results = new ArrayList<>(); for (SearchHit hit : hits) { - Optional<T> ret = callback.apply(hit); - if (ret.isPresent()) { - return ret; + Optional<T> result = callback.apply(hit); + if (result.isPresent()) { + results.add(result.get()); } } - return Optional.empty(); + return results; } @Override @@ -297,18 +332,17 @@ public class ElasticsearchDao implements IndexDao { String indexPostfix = ElasticsearchUtils .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); String sensorType = update.getSensorType(); - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null); - String existingIndex = calculateExistingIndex(update, index, indexPostfix); + String indexName = getIndexName(update, index, indexPostfix); - UpdateRequest updateRequest = buildUpdateRequest(update, sensorType, indexName, existingIndex); + IndexRequest indexRequest = buildIndexRequest(update, sensorType, indexName); try { - UpdateResponse response = client.update(updateRequest).get(); + IndexResponse response = client.index(indexRequest).get(); ShardInfo shardInfo = response.getShardInfo(); int failed = shardInfo.getFailed(); if (failed > 0) { throw new IOException( - "ElasticsearchDao upsert failed: " + Arrays.toString(shardInfo.getFailures())); + "ElasticsearchDao index failed: " + Arrays.toString(shardInfo.getFailures())); } } catch (Exception e) { throw new IOException(e.getMessage(), e); @@ -326,16 +360,14 @@ public class ElasticsearchDao implements IndexDao { for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { Document update = updateEntry.getKey(); String sensorType = update.getSensorType(); - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null); - String existingIndex = calculateExistingIndex(update, updateEntry.getValue(), indexPostfix); - UpdateRequest updateRequest = buildUpdateRequest( + String indexName = getIndexName(update, updateEntry.getValue(), indexPostfix); + IndexRequest indexRequest = buildIndexRequest( update, sensorType, - indexName, - existingIndex + indexName ); - bulkRequestBuilder.add(updateRequest); + bulkRequestBuilder.add(indexRequest); } BulkResponse bulkResponse = bulkRequestBuilder.get(); @@ -346,21 +378,20 @@ public class ElasticsearchDao implements IndexDao { } } - protected String calculateExistingIndex(Document update, Optional<String> index, - String indexPostFix) { - String sensorType = update.getSensorType(); - String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostFix, null); + protected String getIndexName(Document update, Optional<String> index, String indexPostFix) { + return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) + .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) + ); + } - return index.orElse( - searchByGuid(update.getGuid(), - sensorType, - hit -> Optional.ofNullable(hit.getIndex()) - ).orElse(indexName) + protected Optional<String> getIndexName(String guid, String sensorType) { + return searchByGuid(guid, + sensorType, + hit -> Optional.ofNullable(hit.getIndex()) ); } - protected UpdateRequest buildUpdateRequest(Document update, String sensorType, String indexName, - String existingIndex) { + protected IndexRequest buildIndexRequest(Document update, String sensorType, String indexName) { String type = sensorType + "_doc"; Object ts = update.getTimestamp(); IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) @@ -370,9 +401,7 @@ public class ElasticsearchDao implements IndexDao { indexRequest = indexRequest.timestamp(ts.toString()); } - return new UpdateRequest(existingIndex, type, update.getGuid()) - .doc(update.getDocument()) - .upsert(indexRequest); + return indexRequest; } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java index eef134f..c24ba0c 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java @@ -18,18 +18,21 @@ package org.apache.metron.elasticsearch.dao; -import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.apache.metron.common.Constants.GUID; import static org.elasticsearch.index.query.QueryBuilders.boolQuery; import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery; import static org.elasticsearch.index.query.QueryBuilders.existsQuery; import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; import static org.elasticsearch.index.query.QueryBuilders.termQuery; +import com.fasterxml.jackson.databind.JsonNode; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -37,7 +40,6 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; -import org.apache.commons.collections4.SetUtils; import org.apache.metron.common.Constants; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; @@ -45,8 +47,10 @@ import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.indexing.dao.MultiIndexDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.metaalert.MetaScores; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidCreateException; @@ -55,31 +59,26 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.update.Document; -import org.elasticsearch.action.ActionWriteResponse.ShardInfo; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.action.get.MultiGetItemResponse; -import org.elasticsearch.action.get.MultiGetRequest.Item; -import org.elasticsearch.action.get.MultiGetRequestBuilder; -import org.elasticsearch.action.get.MultiGetResponse; -import org.elasticsearch.action.index.IndexRequest; +import org.apache.metron.indexing.dao.update.OriginalNotFoundException; +import org.apache.metron.indexing.dao.update.PatchRequest; import org.elasticsearch.action.search.SearchRequestBuilder; -import org.elasticsearch.action.update.UpdateRequest; -import org.elasticsearch.action.update.UpdateResponse; -import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.index.query.QueryStringQueryBuilder; import org.elasticsearch.index.query.support.QueryInnerHitBuilder; -import org.elasticsearch.search.SearchHit; public class ElasticsearchMetaAlertDao implements MetaAlertDao { - private static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':'); + public static final String SOURCE_TYPE = Constants.SENSOR_TYPE.replace('.', ':'); + private static final String STATUS_PATH = "/status"; + private static final String ALERT_PATH = "/alert"; + private IndexDao indexDao; private ElasticsearchDao elasticsearchDao; private String index = METAALERTS_INDEX; private String threatTriageField = THREAT_FIELD_DEFAULT; private String threatSort = THREAT_SORT_DEFAULT; + private int pageSize = 500; /** * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. @@ -96,7 +95,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { */ public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, String threatSort) { - init(indexDao, threatSort); + init(indexDao, Optional.of(threatSort)); this.index = index; this.threatTriageField = triageLevelField; } @@ -105,8 +104,14 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { //uninitialized. } + /** + * Initializes this implementation by setting the supplied IndexDao and also setting a separate ElasticsearchDao. + * This is needed for some specific Elasticsearch functions (looking up an index from a GUID for example). + * @param indexDao The DAO to wrap for our queries + * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc. + */ @Override - public void init(IndexDao indexDao, String threatSort) { + public void init(IndexDao indexDao, Optional<String> threatSort) { if (indexDao instanceof MultiIndexDao) { this.indexDao = indexDao; MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao; @@ -124,8 +129,8 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { ); } - if (threatSort != null) { - this.threatSort = threatSort; + if (threatSort.isPresent()) { + this.threatSort = threatSort.get(); } } @@ -139,66 +144,63 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { if (guid == null || guid.trim().isEmpty()) { throw new InvalidSearchException("Guid cannot be empty"); } - org.elasticsearch.action.search.SearchResponse esResponse = getMetaAlertsForAlert(guid.trim()); - SearchResponse searchResponse = new SearchResponse(); - searchResponse.setTotal(esResponse.getHits().getTotalHits()); - searchResponse.setResults( - Arrays.stream(esResponse.getHits().getHits()).map(searchHit -> { - SearchResult searchResult = new SearchResult(); - searchResult.setId(searchHit.getId()); - searchResult.setSource(searchHit.getSource()); - searchResult.setScore(searchHit.getScore()); - searchResult.setIndex(searchHit.getIndex()); - return searchResult; - } - ).collect(Collectors.toList())); - return searchResponse; + // Searches for all alerts containing the meta alert guid in it's "metalerts" array + QueryBuilder qb = boolQuery() + .must( + nestedQuery( + ALERT_FIELD, + boolQuery() + .must(termQuery(ALERT_FIELD + "." + GUID, guid)) + ).innerHit(new QueryInnerHitBuilder()) + ) + .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); + return queryAllResults(qb); } @Override @SuppressWarnings("unchecked") public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) throws InvalidCreateException, IOException { - if (request.getGuidToIndices().isEmpty()) { - throw new InvalidCreateException("MetaAlertCreateRequest must contain alert GUIDs"); + List<GetRequest> alertRequests = request.getAlerts(); + if (request.getAlerts().isEmpty()) { + throw new InvalidCreateException("MetaAlertCreateRequest must contain alerts"); } if (request.getGroups().isEmpty()) { throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups"); } // Retrieve the documents going into the meta alert and build it - MultiGetResponse multiGetResponse = getDocumentsByGuid(request); - Document createDoc = buildCreateDocument(multiGetResponse, request.getGroups()); - MetaScores metaScores = calculateMetaScores(createDoc); - createDoc.getDocument().putAll(metaScores.getMetaScores()); - createDoc.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort)); + Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); + + Document metaAlert = buildCreateDocument(alerts, request.getGroups()); + calculateMetaScores(metaAlert); // Add source type to be consistent with other sources and allow filtering - createDoc.getDocument().put("source:type", MetaAlertDao.METAALERT_TYPE); + metaAlert.getDocument().put(SOURCE_TYPE, MetaAlertDao.METAALERT_TYPE); // Start a list of updates / inserts we need to run Map<Document, Optional<String>> updates = new HashMap<>(); - updates.put(createDoc, Optional.of(MetaAlertDao.METAALERTS_INDEX)); + updates.put(metaAlert, Optional.of(MetaAlertDao.METAALERTS_INDEX)); try { // We need to update the associated alerts with the new meta alerts, making sure existing // links are maintained. - List<String> metaAlertField; - for (MultiGetItemResponse itemResponse : multiGetResponse) { - metaAlertField = new ArrayList<>(); - GetResponse response = itemResponse.getResponse(); - if (response.isExists()) { - List<String> alertField = (List<String>) response.getSourceAsMap() - .get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); + Map<String, Optional<String>> guidToIndices = alertRequests.stream().collect(Collectors.toMap( + GetRequest::getGuid, GetRequest::getIndex)); + Map<String, String> guidToSensorTypes = alertRequests.stream().collect(Collectors.toMap( + GetRequest::getGuid, GetRequest::getSensorType)); + for (Document alert: alerts) { + if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) { + // Use the index in the request if it exists + Optional<String> index = guidToIndices.get(alert.getGuid()); + if (!index.isPresent()) { + // Look up the index from Elasticsearch if one is not supplied in the request + index = elasticsearchDao.getIndexName(alert.getGuid(), guidToSensorTypes.get(alert.getGuid())); + if (!index.isPresent()) { + throw new IllegalArgumentException("Could not find index for " + alert.getGuid()); + } } + updates.put(alert, index); } - metaAlertField.add(createDoc.getGuid()); - - Document alertUpdate = buildAlertUpdate(response.getId(), - (String) response.getSource().get(SOURCE_TYPE), metaAlertField, - (Long) response.getSourceAsMap().get("_timestamp")); - updates.put(alertUpdate, Optional.of(itemResponse.getIndex())); } // Kick off any updates. @@ -206,7 +208,7 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse(); createResponse.setCreated(true); - createResponse.setGuid(createDoc.getGuid()); + createResponse.setGuid(metaAlert.getGuid()); return createResponse; } catch (IOException ioe) { throw new InvalidCreateException("Unable to create meta alert", ioe); @@ -214,6 +216,149 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } @Override + public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) + throws IOException { + Map<Document, Optional<String>> updates = new HashMap<>(); + Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); + if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) { + Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); + boolean metaAlertUpdated = addAlertsToMetaAlert(metaAlert, alerts); + if (metaAlertUpdated) { + calculateMetaScores(metaAlert); + updates.put(metaAlert, Optional.of(index)); + for(Document alert: alerts) { + if (addMetaAlertToAlert(metaAlert.getGuid(), alert)) { + updates.put(alert, Optional.empty()); + } + } + indexDaoUpdate(updates); + } + return metaAlertUpdated; + } else { + throw new IllegalStateException("Adding alerts to an INACTIVE meta alert is not allowed"); + } + } + + protected boolean addAlertsToMetaAlert(Document metaAlert, Iterable<Document> alerts) { + boolean alertAdded = false; + List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD); + Set<String> currentAlertGuids = currentAlerts.stream().map(currentAlert -> + (String) currentAlert.get(GUID)).collect(Collectors.toSet()); + for (Document alert: alerts) { + String alertGuid = alert.getGuid(); + // Only add an alert if it isn't already in the meta alert + if (!currentAlertGuids.contains(alertGuid)) { + currentAlerts.add(alert.getDocument()); + alertAdded = true; + } + } + return alertAdded; + } + + protected boolean addMetaAlertToAlert(String metaAlertGuid, Document alert) { + List<String> metaAlertField = new ArrayList<>(); + List<String> alertField = (List<String>) alert.getDocument() + .get(MetaAlertDao.METAALERT_FIELD); + if (alertField != null) { + metaAlertField.addAll(alertField); + } + boolean metaAlertAdded = !metaAlertField.contains(metaAlertGuid); + if (metaAlertAdded) { + metaAlertField.add(metaAlertGuid); + alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField); + } + return metaAlertAdded; + } + + @Override + public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) + throws IOException { + Map<Document, Optional<String>> updates = new HashMap<>(); + Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); + if (MetaAlertStatus.ACTIVE.getStatusString().equals(metaAlert.getDocument().get(STATUS_FIELD))) { + Iterable<Document> alerts = indexDao.getAllLatest(alertRequests); + Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect( + Collectors.toList()); + boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, alertGuids); + if (metaAlertUpdated) { + calculateMetaScores(metaAlert); + updates.put(metaAlert, Optional.of(index)); + for(Document alert: alerts) { + if (removeMetaAlertFromAlert(metaAlert.getGuid(), alert)) { + updates.put(alert, Optional.empty()); + } + } + indexDaoUpdate(updates); + } + return metaAlertUpdated; + } else { + throw new IllegalStateException("Removing alerts from an INACTIVE meta alert is not allowed"); + } + + } + + protected boolean removeAlertsFromMetaAlert(Document metaAlert, Collection<String> alertGuids) { + List<Map<String,Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument().get(ALERT_FIELD); + int previousSize = currentAlerts.size(); + // Only remove an alert if it is in the meta alert + currentAlerts.removeIf(currentAlert -> alertGuids.contains((String) currentAlert.get(GUID))); + return currentAlerts.size() != previousSize; + } + + protected boolean removeMetaAlertFromAlert(String metaAlertGuid, Document alert) { + List<String> metaAlertField = new ArrayList<>(); + List<String> alertField = (List<String>) alert.getDocument() + .get(MetaAlertDao.METAALERT_FIELD); + if (alertField != null) { + metaAlertField.addAll(alertField); + } + boolean metaAlertRemoved = metaAlertField.remove(metaAlertGuid); + if (metaAlertRemoved) { + alert.getDocument().put(MetaAlertDao.METAALERT_FIELD, metaAlertField); + } + return metaAlertRemoved; + } + + @Override + public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) + throws IOException { + Map<Document, Optional<String>> updates = new HashMap<>(); + Document metaAlert = indexDao.getLatest(metaAlertGuid, METAALERT_TYPE); + String currentStatus = (String) metaAlert.getDocument().get(MetaAlertDao.STATUS_FIELD); + boolean metaAlertUpdated = !status.getStatusString().equals(currentStatus); + if (metaAlertUpdated) { + metaAlert.getDocument().put(MetaAlertDao.STATUS_FIELD, status.getStatusString()); + updates.put(metaAlert, Optional.of(index)); + List<GetRequest> getRequests = new ArrayList<>(); + List<Map<String, Object>> currentAlerts = (List<Map<String, Object>>) metaAlert.getDocument() + .get(MetaAlertDao.ALERT_FIELD); + currentAlerts.stream().forEach(currentAlert -> { + getRequests.add(new GetRequest((String) currentAlert.get(GUID), (String) currentAlert.get(SOURCE_TYPE))); + }); + Iterable<Document> alerts = indexDao.getAllLatest(getRequests); + for (Document alert : alerts) { + boolean metaAlertAdded = false; + boolean metaAlertRemoved = false; + // If we're making it active add add the meta alert guid for every alert. + if (MetaAlertStatus.ACTIVE.equals(status)) { + metaAlertAdded = addMetaAlertToAlert(metaAlert.getGuid(), alert); + } + // If we're making it inactive, remove the meta alert guid from every alert. + if (MetaAlertStatus.INACTIVE.equals(status)) { + metaAlertRemoved = removeMetaAlertFromAlert(metaAlert.getGuid(), alert); + } + if (metaAlertAdded || metaAlertRemoved) { + updates.put(alert, Optional.empty()); + } + } + } + if (metaAlertUpdated) { + indexDaoUpdate(updates); + } + return metaAlertUpdated; + } + + @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { // Wrap the query to also get any meta-alerts. QueryBuilder qb = constantScoreQuery(boolQuery() @@ -242,95 +387,171 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } @Override + public Iterable<Document> getAllLatest( + List<GetRequest> getRequests) throws IOException { + return indexDao.getAllLatest(getRequests); + } + + @Override public void update(Document update, Optional<String> index) throws IOException { if (METAALERT_TYPE.equals(update.getSensorType())) { // We've been passed an update to the meta alert. - handleMetaUpdate(update); + throw new UnsupportedOperationException("Meta alerts cannot be directly updated"); } else { + Map<Document, Optional<String>> updates = new HashMap<>(); + updates.put(update, index); // We need to update an alert itself. Only that portion of the update can be delegated. // We still need to get meta alerts potentially associated with it and update. - org.elasticsearch.action.search.SearchResponse response = getMetaAlertsForAlert( - update.getGuid() - ); - - // Each hit, if any, is a metaalert that needs to be updated - for (SearchHit hit : response.getHits()) { - handleAlertUpdate(update, hit); + Collection<Document> metaAlerts = getMetaAlertsForAlert(update.getGuid()).getResults().stream() + .map(searchResult -> new Document(searchResult.getSource(), searchResult.getId(), METAALERT_TYPE, 0L)) + .collect(Collectors.toList()); + // Each meta alert needs to be updated with the new alert + for (Document metaAlert : metaAlerts) { + replaceAlertInMetaAlert(metaAlert, update); + updates.put(metaAlert, Optional.of(METAALERTS_INDEX)); } // Run the alert's update - indexDao.update(update, index); + indexDao.batchUpdate(updates); } } + protected boolean replaceAlertInMetaAlert(Document metaAlert, Document alert) { + boolean metaAlertUpdated = removeAlertsFromMetaAlert(metaAlert, Collections.singleton(alert.getGuid())); + if (metaAlertUpdated) { + addAlertsToMetaAlert(metaAlert, Collections.singleton(alert)); + } + return metaAlertUpdated; + } + @Override public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { throw new UnsupportedOperationException("Meta alerts do not allow for bulk updates"); } /** + * Does not allow patches on the "alerts" or "status" fields. These fields must be updated with their + * dedicated methods. + * + * @param request The patch request + * @param timestamp Optionally a timestamp to set. If not specified then current time is used. + * @throws OriginalNotFoundException + * @throws IOException + */ + @Override + public void patch(PatchRequest request, Optional<Long> timestamp) + throws OriginalNotFoundException, IOException { + if (isPatchAllowed(request)) { + Document d = getPatchedDocument(request, timestamp); + indexDao.update(d, Optional.ofNullable(request.getIndex())); + } else { + throw new IllegalArgumentException("Meta alert patches are not allowed for /alert or /status paths. " + + "Please use the add/remove alert or update status functions instead."); + } + } + + protected boolean isPatchAllowed(PatchRequest request) { + Iterator patchIterator = request.getPatch().iterator(); + while(patchIterator.hasNext()) { + JsonNode patch = (JsonNode) patchIterator.next(); + String path = patch.path("path").asText(); + if (STATUS_PATH.equals(path) || ALERT_PATH.equals(path)) { + return false; + } + } + return true; + } + + /** * Given an alert GUID, retrieve all associated meta alerts. - * @param guid The GUID of the child alert + * @param alertGuid The GUID of the child alert * @return The Elasticsearch response containing the meta alerts */ - protected org.elasticsearch.action.search.SearchResponse getMetaAlertsForAlert(String guid) { + protected SearchResponse getMetaAlertsForAlert(String alertGuid) { QueryBuilder qb = boolQuery() .must( nestedQuery( ALERT_FIELD, boolQuery() - .must(termQuery(ALERT_FIELD + "." + Constants.GUID, guid)) + .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid)) ).innerHit(new QueryInnerHitBuilder()) ) .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); - SearchRequest sr = new SearchRequest(); - ArrayList<String> indices = new ArrayList<>(); - indices.add(index); - sr.setIndices(indices); - return elasticsearchDao + return queryAllResults(qb); + } + + /** + * Elasticsearch queries default to 10 records returned. Some internal queries require that all + * results are returned. Rather than setting an arbitrarily high size, this method pages through results + * and returns them all in a single SearchResponse. + * @param qb + * @return + */ + protected SearchResponse queryAllResults(QueryBuilder qb) { + SearchRequestBuilder searchRequestBuilder = elasticsearchDao .getClient() .prepareSearch(index) .addFields("*") .setFetchSource(true) .setQuery(qb) + .setSize(pageSize); + org.elasticsearch.action.search.SearchResponse esResponse = searchRequestBuilder .execute() .actionGet(); + List<SearchResult> allResults = getSearchResults(esResponse); + long total = esResponse.getHits().getTotalHits(); + if (total > pageSize) { + int pages = (int) (total / pageSize) + 1; + for (int i = 1; i < pages; i++) { + int from = i * pageSize; + searchRequestBuilder.setFrom(from); + esResponse = searchRequestBuilder + .execute() + .actionGet(); + allResults.addAll(getSearchResults(esResponse)); + } + } + SearchResponse searchResponse = new SearchResponse(); + searchResponse.setTotal(total); + searchResponse.setResults(allResults); + return searchResponse; } /** - * Return child documents after retrieving them from Elasticsearch. - * @param request The request detailing which child alerts we need - * @return The Elasticsearch response to our request for alerts + * Transforms a list of Elasticsearch SearchHits to a list of SearchResults + * @param searchResponse + * @return */ - protected MultiGetResponse getDocumentsByGuid(MetaAlertCreateRequest request) { - MultiGetRequestBuilder multiGet = elasticsearchDao.getClient().prepareMultiGet(); - for (Entry<String, String> entry : request.getGuidToIndices().entrySet()) { - multiGet.add(new Item(entry.getValue(), null, entry.getKey())); - } - return multiGet.get(); + protected List<SearchResult> getSearchResults(org.elasticsearch.action.search.SearchResponse searchResponse) { + return Arrays.stream(searchResponse.getHits().getHits()).map(searchHit -> { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + searchResult.setSource(searchHit.getSource()); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + ).collect(Collectors.toList()); } /** * Build the Document representing a meta alert to be created. - * @param multiGetResponse The Elasticsearch results for the meta alerts child documents + * @param alerts The Elasticsearch results for the meta alerts child documents * @param groups The groups used to create this meta alert * @return A Document representing the new meta alert */ - protected Document buildCreateDocument(MultiGetResponse multiGetResponse, List<String> groups) { + protected Document buildCreateDocument(Iterable<Document> alerts, List<String> groups) { // Need to create a Document from the multiget. Scores will be calculated later Map<String, Object> metaSource = new HashMap<>(); List<Map<String, Object>> alertList = new ArrayList<>(); - for (MultiGetItemResponse itemResponse : multiGetResponse) { - GetResponse response = itemResponse.getResponse(); - if (response.isExists()) { - alertList.add(response.getSource()); - } + for (Document alert: alerts) { + alertList.add(alert.getDocument()); } metaSource.put(ALERT_FIELD, alertList); // Add any meta fields String guid = UUID.randomUUID().toString(); - metaSource.put(Constants.GUID, guid); + metaSource.put(GUID, guid); metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); metaSource.put(GROUPS_FIELD, groups); metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()); @@ -339,29 +560,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } /** - * Process an update to a meta alert itself. - * @param update The update Document to be applied - * @throws IOException If there's a problem running the update - */ - protected void handleMetaUpdate(Document update) throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - - if (update.getDocument().containsKey(MetaAlertDao.STATUS_FIELD)) { - // Update all associated alerts to maintain the meta alert link properly - updates.putAll(buildStatusAlertUpdates(update)); - } - if (update.getDocument().containsKey(MetaAlertDao.ALERT_FIELD)) { - // If the alerts field changes (i.e. add/remove alert), update all affected alerts to - // maintain the meta alert link properly. - updates.putAll(buildAlertFieldUpdates(update)); - } - - // Run meta alert update. - updates.put(update, Optional.of(index)); - indexDaoUpdate(updates); - } - - /** * Calls the single update variant if there's only one update, otherwise calls batch. * @param updates The list of updates to run * @throws IOException If there's an update error @@ -375,203 +573,6 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } // else we have no updates, so don't do anything } - protected Map<Document, Optional<String>> buildStatusAlertUpdates(Document update) - throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update); - for (Map<String, Object> alert : alerts) { - // Retrieve the associated alert, so we can update the array - List<String> metaAlertField = new ArrayList<>(); - @SuppressWarnings("unchecked") - List<String> alertField = (List<String>) alert.get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); - } - String status = (String) update.getDocument().get(MetaAlertDao.STATUS_FIELD); - - Document alertUpdate = null; - String alertGuid = (String) alert.get(Constants.GUID); - // If we're making it active add add the meta alert guid for every alert. - if (MetaAlertStatus.ACTIVE.getStatusString().equals(status) - && !metaAlertField.contains(update.getGuid())) { - metaAlertField.add(update.getGuid()); - alertUpdate = buildAlertUpdate( - alertGuid, - (String) alert.get(SOURCE_TYPE), - metaAlertField, - (Long) alert.get("_timestamp") - ); - } - - // If we're making it inactive, remove the meta alert guid from every alert. - if (MetaAlertStatus.INACTIVE.getStatusString().equals(status) - && metaAlertField.remove(update.getGuid())) { - alertUpdate = buildAlertUpdate( - alertGuid, - (String) alert.get(SOURCE_TYPE), - metaAlertField, - (Long) alert.get("_timestamp") - ); - } - - // Only run an alert update if we have an actual update. - if (alertUpdate != null) { - updates.put(alertUpdate, Optional.empty()); - } - } - return updates; - } - - protected Map<Document, Optional<String>> buildAlertFieldUpdates(Document update) - throws IOException { - Map<Document, Optional<String>> updates = new HashMap<>(); - // If we've updated the alerts field (i.e add/remove), recalculate meta alert scores and - // the metaalerts fields for updating the children alerts. - MetaScores metaScores = calculateMetaScores(update); - update.getDocument().putAll(metaScores.getMetaScores()); - update.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort)); - - // Get the set of GUIDs that are in the new version. - Set<String> updateGuids = new HashSet<>(); - @SuppressWarnings("unchecked") - List<Map<String, Object>> updateAlerts = (List<Map<String, Object>>) update.getDocument() - .get(MetaAlertDao.ALERT_FIELD); - for (Map<String, Object> alert : updateAlerts) { - updateGuids.add((String) alert.get(Constants.GUID)); - } - - // Get the set of GUIDs from the old version - List<Map<String, Object>> alerts = getAllAlertsForMetaAlert(update); - Set<String> currentGuids = new HashSet<>(); - for (Map<String, Object> alert : alerts) { - currentGuids.add((String) alert.get(Constants.GUID)); - } - - // Get both set differences, so we know what's been added and removed. - Set<String> removedGuids = SetUtils.difference(currentGuids, updateGuids); - Set<String> addedGuids = SetUtils.difference(updateGuids, currentGuids); - - Document alertUpdate; - - // Handle any removed GUIDs - for (String guid : removedGuids) { - // Retrieve the associated alert, so we can update the array - Document alert = elasticsearchDao.getLatest(guid, null); - List<String> metaAlertField = new ArrayList<>(); - @SuppressWarnings("unchecked") - List<String> alertField = (List<String>) alert.getDocument() - .get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); - } - if (metaAlertField.remove(update.getGuid())) { - alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField, - alert.getTimestamp()); - updates.put(alertUpdate, Optional.empty()); - } - } - - // Handle any added GUIDs - for (String guid : addedGuids) { - // Retrieve the associated alert, so we can update the array - Document alert = elasticsearchDao.getLatest(guid, null); - List<String> metaAlertField = new ArrayList<>(); - @SuppressWarnings("unchecked") - List<String> alertField = (List<String>) alert.getDocument() - .get(MetaAlertDao.METAALERT_FIELD); - if (alertField != null) { - metaAlertField.addAll(alertField); - } - metaAlertField.add(update.getGuid()); - alertUpdate = buildAlertUpdate(guid, alert.getSensorType(), metaAlertField, - alert.getTimestamp()); - updates.put(alertUpdate, Optional.empty()); - } - - return updates; - } - - @SuppressWarnings("unchecked") - protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document update) throws IOException { - Document latest = indexDao.getLatest(update.getGuid(), MetaAlertDao.METAALERT_TYPE); - if (latest == null) { - return new ArrayList<>(); - } - List<String> guids = new ArrayList<>(); - List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) latest.getDocument() - .get(MetaAlertDao.ALERT_FIELD); - for (Map<String, Object> alert : latestAlerts) { - guids.add((String) alert.get(Constants.GUID)); - } - - List<Map<String, Object>> alerts = new ArrayList<>(); - QueryBuilder query = QueryBuilders.idsQuery().ids(guids); - SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch() - .setQuery(query); - org.elasticsearch.action.search.SearchResponse response = request.get(); - for (SearchHit hit : response.getHits().getHits()) { - alerts.add(hit.sourceAsMap()); - } - return alerts; - } - - /** - * Builds an update Document for updating the meta alerts list. - * @param alertGuid The GUID of the alert to update - * @param sensorType The sensor type to update - * @param metaAlertField The new metaAlertList to use - * @return The update Document - */ - protected Document buildAlertUpdate(String alertGuid, String sensorType, - List<String> metaAlertField, Long timestamp) { - Document alertUpdate; - Map<String, Object> document = new HashMap<>(); - document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField); - alertUpdate = new Document( - document, - alertGuid, - sensorType, - timestamp - ); - return alertUpdate; - } - - /** - * Takes care of upserting a child alert to a meta alert. - * @param update The update Document to be applied - * @param hit The meta alert to be updated - * @throws IOException If there's an issue running the upsert - */ - protected void handleAlertUpdate(Document update, SearchHit hit) throws IOException { - XContentBuilder builder = buildUpdatedMetaAlert(update, hit); - - // Run the meta alert's update - IndexRequest indexRequest = new IndexRequest( - METAALERTS_INDEX, - METAALERT_DOC, - hit.getId() - ).source(builder); - UpdateRequest updateRequest = new UpdateRequest( - METAALERTS_INDEX, - METAALERT_DOC, - hit.getId() - ).doc(builder).upsert(indexRequest); - try { - UpdateResponse updateResponse = elasticsearchDao.getClient().update(updateRequest).get(); - - ShardInfo shardInfo = updateResponse.getShardInfo(); - int failed = shardInfo.getFailed(); - if (failed > 0) { - throw new IOException( - "ElasticsearchMetaAlertDao upsert failed: " - + Arrays.toString(shardInfo.getFailures()) - ); - } - } catch (Exception e) { - throw new IOException(e.getMessage(), e); - } - } - @Override public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { @@ -595,80 +596,26 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { /** * Calculate the meta alert scores for a Document. - * @param document The Document containing scores + * @param metaAlert The Document containing scores * @return Set of score statistics */ @SuppressWarnings("unchecked") - protected MetaScores calculateMetaScores(Document document) { - List<Object> alertsRaw = ((List<Object>) document.getDocument().get(ALERT_FIELD)); - if (alertsRaw == null || alertsRaw.isEmpty()) { - throw new IllegalArgumentException("No alerts to use in calculation for doc GUID: " - + document.getDocument().get(Constants.GUID)); - } - - ArrayList<Double> scores = new ArrayList<>(); - for (Object alertRaw : alertsRaw) { - Map<String, Object> alert = (Map<String, Object>) alertRaw; - Double scoreNum = parseThreatField(alert.get(threatTriageField)); - if (scoreNum != null) { - scores.add(scoreNum); - } - } - - return new MetaScores(scores); - } - - /** - * Builds the updated meta alert based on the update. - * @param update The update Document for the meta alert - * @param hit The meta alert to be updated - * @return A builder for Elasticsearch to use - * @throws IOException If we have an issue building the result - */ - protected XContentBuilder buildUpdatedMetaAlert(Document update, SearchHit hit) - throws IOException { - // Make sure to get all the threat scores while we're going through the docs - List<Double> scores = new ArrayList<>(); - // Start building the new version of the metaalert - XContentBuilder builder = jsonBuilder().startObject(); - - // Run through the nested alerts of the meta alert and either use the new or old versions - builder.startArray(ALERT_FIELD); - Map<String, Object> hitAlerts = hit.sourceAsMap(); - - @SuppressWarnings("unchecked") - List<Map<String, Object>> alertHits = (List<Map<String, Object>>) hitAlerts.get(ALERT_FIELD); - for (Map<String, Object> alertHit : alertHits) { - Map<String, Object> docMap = alertHit; - // If we're at the update use it instead of the original - if (alertHit.get(Constants.GUID).equals(update.getGuid())) { - docMap = update.getDocument(); - } - builder.map(docMap); - - // Handle either String or Number values in the threatTriageField - Object threatRaw = docMap.get(threatTriageField); - Double threat = parseThreatField(threatRaw); - - if (threat != null) { - scores.add(threat); - } - } - builder.endArray(); - - // Add all the meta alert fields, and score calculation - Map<String, Object> updatedMeta = new HashMap<>(); - updatedMeta.putAll(hit.getSource()); - updatedMeta.putAll(new MetaScores(scores).getMetaScores()); - for (Entry<String, Object> entry : updatedMeta.entrySet()) { - // The alerts field is being added separately, so ignore the original - if (!(entry.getKey().equals(ALERT_FIELD))) { - builder.field(entry.getKey(), entry.getValue()); + protected void calculateMetaScores(Document metaAlert) { + MetaScores metaScores = new MetaScores(new ArrayList<>()); + List<Object> alertsRaw = ((List<Object>) metaAlert.getDocument().get(ALERT_FIELD)); + if (alertsRaw != null && !alertsRaw.isEmpty()) { + ArrayList<Double> scores = new ArrayList<>(); + for (Object alertRaw : alertsRaw) { + Map<String, Object> alert = (Map<String, Object>) alertRaw; + Double scoreNum = parseThreatField(alert.get(threatTriageField)); + if (scoreNum != null) { + scores.add(scoreNum); + } } + metaScores = new MetaScores(scores); } - builder.endObject(); - - return builder; + metaAlert.getDocument().putAll(metaScores.getMetaScores()); + metaAlert.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort)); } private Double parseThreatField(Object threatRaw) { @@ -680,4 +627,12 @@ public class ElasticsearchMetaAlertDao implements MetaAlertDao { } return threat; } + + public int getPageSize() { + return pageSize; + } + + public void setPageSize(int pageSize) { + this.pageSize = pageSize; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java deleted file mode 100644 index 6c8e858..0000000 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.elasticsearch.dao; - -public enum MetaAlertStatus { - ACTIVE("active"), - INACTIVE("inactive"); - - private String statusString; - - MetaAlertStatus(String statusString) { - this.statusString = statusString; - } - - public String getStatusString() { - return statusString; - } -}
