METRON-1158 Build backend for grouping alerts into meta alerts (justinleet) closes apache/metron#734
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/40c93527 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/40c93527 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/40c93527 Branch: refs/heads/master Commit: 40c93527e2a693ec6580dc0d09356dfa3b525aa4 Parents: 309d375 Author: justinleet <[email protected]> Authored: Wed Sep 13 11:38:05 2017 -0400 Committer: leet <[email protected]> Committed: Wed Sep 13 11:38:05 2017 -0400 ---------------------------------------------------------------------- .../CURRENT/package/files/bro_index.template | 3 + .../CURRENT/package/files/error_index.template | 3 + .../CURRENT/package/files/meta_index.mapping | 42 ++ .../CURRENT/package/files/snort_index.template | 3 + .../CURRENT/package/files/yaf_index.template | 3 + .../CURRENT/package/scripts/indexing_master.py | 8 + .../package/scripts/params/params_linux.py | 1 + metron-interface/metron-rest/README.md | 18 + .../apache/metron/rest/MetronRestConstants.java | 3 + .../apache/metron/rest/config/IndexConfig.java | 16 +- .../rest/controller/MetaAlertController.java | 64 +++ .../metron/rest/service/MetaAlertService.java | 31 ++ .../rest/service/impl/MetaAlertServiceImpl.java | 66 +++ .../rest/service/impl/SearchServiceImpl.java | 1 + .../src/main/resources/application-test.yml | 5 + .../src/main/resources/application.yml | 4 + .../rest/controller/DaoControllerTest.java | 20 +- .../MetaAlertControllerIntegrationTest.java | 174 ++++++++ .../SearchControllerIntegrationTest.java | 8 +- .../UpdateControllerIntegrationTest.java | 20 +- .../elasticsearch/dao/ElasticsearchDao.java | 57 ++- .../dao/ElasticsearchMetaAlertDao.java | 446 +++++++++++++++++++ .../elasticsearch/dao/MetaAlertStatus.java | 34 ++ .../dao/ElasticsearchMetaAlertDaoTest.java | 427 ++++++++++++++++++ .../ElasticsearchMetaAlertIntegrationTest.java | 317 +++++++++++++ .../ElasticsearchSearchIntegrationTest.java | 18 +- .../ElasticsearchUpdateIntegrationTest.java | 3 + .../components/ElasticSearchComponent.java | 15 + metron-platform/metron-indexing/README.md | 17 + .../metron/indexing/dao/MetaAlertDao.java | 72 +++ .../metron/indexing/dao/MultiIndexDao.java | 4 + .../dao/metaalert/MetaAlertCreateRequest.java | 51 +++ .../dao/metaalert/MetaAlertCreateResponse.java | 31 ++ .../indexing/dao/metaalert/MetaScores.java | 54 +++ .../metron/indexing/dao/search/FieldType.java | 2 + .../dao/search/InvalidCreateException.java | 28 ++ .../indexing/dao/search/SearchResult.java | 10 + .../metron/indexing/dao/update/Document.java | 13 +- .../apache/metron/indexing/dao/InMemoryDao.java | 38 +- .../indexing/dao/InMemoryMetaAlertDao.java | 198 ++++++++ .../indexing/dao/SearchIntegrationTest.java | 77 +++- .../stellar/dsl/functions/BasicStellarTest.java | 5 + pom.xml | 1 + 43 files changed, 2357 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template index 18c5d9b..7db006e 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template @@ -151,6 +151,9 @@ "type": "string", "index": "not_analyzed" }, + "alert": { + "type": "nested" + }, "ip_src_addr": { "type": "ip" }, http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template index 3bb4633..e79d482 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/error_index.template @@ -50,6 +50,9 @@ "error_type": { "type": "string", "index": "not_analyzed" + }, + "alert": { + "type": "nested" } } } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/meta_index.mapping ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/meta_index.mapping b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/meta_index.mapping new file mode 100644 index 0000000..c42343e --- /dev/null +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/meta_index.mapping @@ -0,0 +1,42 @@ +{ + "mappings": { + "metaalert_doc": { + "_timestamp": { + "enabled": true + }, + "dynamic_templates": [ + { + "alert_template": { + "path_match": "alert.*", + "match_mapping_type": "string", + "mapping": { + "type": "string", + "index": "not_analyzed" + } + } + } + ], + "properties": { + "guid": { + "type": "string", + "index": "not_analyzed" + }, + "score": { + "type": "string", + "index": "not_analyzed" + }, + "status": { + "type": "string", + "index": "not_analyzed" + }, + "timestamp": { + "type": "date", + "format": "epoch_millis" + }, + "alert": { + "type": "nested" + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template index 2311cf2..f13a9ee 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/snort_index.template @@ -203,6 +203,9 @@ }, "ttl": { "type": "integer" + }, + "alert": { + "type": "nested" } } } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template index bd90929..d84235d 100644 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/yaf_index.template @@ -225,6 +225,9 @@ }, "end-reason": { "type": "string" + }, + "alert": { + "type": "nested" } } } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py index 71dcc74..68e238a 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/indexing_master.py @@ -124,6 +124,11 @@ class Indexing(Script): content=StaticFile('error_index.template') ) + File(params.meta_index_path, + mode=0755, + content=StaticFile('meta_index.mapping') + ) + bro_cmd = ambari_format( 'curl -s -XPOST http://{es_http_url}/_template/bro_index -d @{bro_index_path}') Execute(bro_cmd, logoutput=True) @@ -136,6 +141,9 @@ class Indexing(Script): error_cmd = ambari_format( 'curl -s -XPOST http://{es_http_url}/_template/error_index -d @{error_index_path}') Execute(error_cmd, logoutput=True) + error_cmd = ambari_format( + 'curl -s -XPOST http://{es_http_url}/metaalerts -d @{meta_index_path}') + Execute(error_cmd, logoutput=True) def elasticsearch_template_delete(self, env): from params import params http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py ---------------------------------------------------------------------- diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py index a9d00dd..72f295b 100755 --- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py +++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py @@ -188,6 +188,7 @@ bro_index_path = tmp_dir + "/bro_index.template" snort_index_path = tmp_dir + "/snort_index.template" yaf_index_path = tmp_dir + "/yaf_index.template" error_index_path = tmp_dir + "/error_index.template" +meta_index_path = tmp_dir + "/meta_index.mapping" # Zeppelin Notebooks metron_config_zeppelin_path = format("{metron_config_path}/zeppelin") http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/README.md ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/README.md b/metron-interface/metron-rest/README.md index 27e04a3..97ab95c 100644 --- a/metron-interface/metron-rest/README.md +++ b/metron-interface/metron-rest/README.md @@ -200,6 +200,9 @@ Request and Response objects are JSON formatted. The JSON schemas are available | [ `GET /api/v1/kafka/topic/{name}`](#get-apiv1kafkatopicname)| | [ `DELETE /api/v1/kafka/topic/{name}`](#delete-apiv1kafkatopicname)| | [ `GET /api/v1/kafka/topic/{name}/sample`](#get-apiv1kafkatopicnamesample)| +| [ `GET /api/v1/metaalert/searchByAlert`](#get-apiv1metaalertsearchbyalert)| +| [ `GET /api/v1/metaalert/create`](#get-apiv1metaalertcreate)| +| [ `GET /api/v1/search/search`](#get-apiv1searchsearch)| | [ `POST /api/v1/search/search`](#get-apiv1searchsearch)| | [ `POST /api/v1/search/group`](#get-apiv1searchgroup)| | [ `GET /api/v1/search/findOne`](#get-apiv1searchfindone)| @@ -365,6 +368,21 @@ Request and Response objects are JSON formatted. The JSON schemas are available * 200 - Returns sample message * 404 - Either Kafka topic is missing or contains no messages +### `POST /api/v1/metaalert/searchByAlert` + * Description: Searches meta alerts to find any containing an alert for the provided GUID + * Input: + * guid - GUID of the alert + * Returns: + * 200 - Returns the meta alerts associated with this alert + * 404 - The child alert isn't found + +### `POST /api/v1/metaalert/create` + * Description: Creates a meta alert containing the provide alerts + * Input: + * request - Meta Alert Create Request + * Returns: + * 200 - The meta alert was created + ### `POST /api/v1/search/search` * Description: Searches the indexing store * Input: http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java index c5b3c13..b0f553f 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java @@ -59,4 +59,7 @@ public class MetronRestConstants { public static final String SEARCH_MAX_GROUPS = "search.max.groups"; public static final String INDEX_DAO_IMPL = "index.dao.impl"; public static final String INDEX_HBASE_TABLE_PROVIDER_IMPL = "index.hbase.provider"; + + public static final String META_DAO_IMPL = "meta.dao.impl"; + public static final String META_DAO_SORT = "meta.dao.sort"; } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java index b6ac5e7..8eabb2e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/IndexConfig.java @@ -24,6 +24,7 @@ import org.apache.metron.hbase.TableProvider; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.IndexDaoFactory; +import org.apache.metron.indexing.dao.MetaAlertDao; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.GlobalConfigService; @@ -53,6 +54,8 @@ public class IndexConfig { String indexDaoImpl = environment.getProperty(MetronRestConstants.INDEX_DAO_IMPL, String.class, null); int searchMaxResults = environment.getProperty(MetronRestConstants.SEARCH_MAX_RESULTS, Integer.class, 1000); int searchMaxGroups = environment.getProperty(MetronRestConstants.SEARCH_MAX_GROUPS, Integer.class, 1000); + String metaDaoImpl = environment.getProperty(MetronRestConstants.META_DAO_IMPL, String.class, null); + String metaDaoSort = environment.getProperty(MetronRestConstants.META_DAO_SORT, String.class, null); AccessConfig config = new AccessConfig(); config.setMaxSearchResults(searchMaxResults); config.setMaxSearchGroups(searchMaxGroups); @@ -67,10 +70,18 @@ public class IndexConfig { if (indexDaoImpl == null) { throw new IllegalStateException("You must provide an index DAO implementation via the " + INDEX_DAO_IMPL + " config"); } - IndexDao ret = IndexDaoFactory.combine(IndexDaoFactory.create(indexDaoImpl, config)); - if (ret == null) { + IndexDao indexDao = IndexDaoFactory.combine(IndexDaoFactory.create(indexDaoImpl, config)); + if (indexDao == null) { throw new IllegalStateException("IndexDao is unable to be created."); } + if (metaDaoImpl == null) { + // We're not using meta alerts. + return indexDao; + } + + // Create the meta alert dao and wrap it around the index dao. + MetaAlertDao ret = (MetaAlertDao) IndexDaoFactory.create(metaDaoImpl, config).get(0); + ret.init(indexDao, metaDaoSort); return ret; } catch(RuntimeException re) { @@ -80,5 +91,4 @@ public class IndexConfig { throw new IllegalStateException("Unable to create index DAO: " + e.getMessage(), e); } } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java new file mode 100644 index 0000000..e9cff8b --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/controller/MetaAlertController.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.controller; + +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import io.swagger.annotations.ApiResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.MetaAlertService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.RestController; + +@RestController +@RequestMapping("/api/v1/metaalert") +public class MetaAlertController { + + @Autowired + private MetaAlertService metaAlertService; + + @ApiOperation(value = "Get all meta alerts for alert") + @ApiResponse(message = "Search results", code = 200) + @RequestMapping(value = "/searchByAlert", method = RequestMethod.POST) + ResponseEntity<SearchResponse> searchByAlert( + @ApiParam(name = "guid", value = "GUID", required = true) + @RequestBody final String guid + ) throws RestException { + return new ResponseEntity<>(metaAlertService.getAllMetaAlertsForAlert(guid), HttpStatus.OK); + } + + @ApiOperation(value = "Create a meta alert") + @ApiResponse(message = "Created meta alert", code = 200) + @RequestMapping(value = "/create", method = RequestMethod.POST) + ResponseEntity<MetaAlertCreateResponse> create( + @ApiParam(name = "request", value = "Meta Alert Create Request", required = true) + @RequestBody final MetaAlertCreateRequest createRequest + ) throws RestException { + return new ResponseEntity<>(metaAlertService.create(createRequest), HttpStatus.OK); + } +} + http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java new file mode 100644 index 0000000..c339506 --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/MetaAlertService.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.service; + +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.rest.RestException; + +public interface MetaAlertService { + + MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException; + + SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException; +} http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java new file mode 100644 index 0000000..f120c9e --- /dev/null +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/MetaAlertServiceImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.service.impl; + +import java.io.IOException; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.search.InvalidCreateException; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.rest.RestException; +import org.apache.metron.rest.service.MetaAlertService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; + +@Service +public class MetaAlertServiceImpl implements MetaAlertService { + private MetaAlertDao dao; + private Environment environment; + + @Autowired + public MetaAlertServiceImpl(IndexDao indexDao, Environment environment) { + // By construction this is always a meta alert dao + this.dao = (MetaAlertDao) indexDao; + this.environment = environment; + } + + + @Override + public MetaAlertCreateResponse create(MetaAlertCreateRequest createRequest) throws RestException { + try { + return dao.createMetaAlert(createRequest); + } catch (InvalidCreateException | IOException e) { + throw new RestException(e.getMessage(), e); + } + } + + @Override + public SearchResponse getAllMetaAlertsForAlert(String guid) throws RestException { + try { + return dao.getAllMetaAlertsForAlert(guid); + } catch (InvalidSearchException ise) { + throw new RestException(ise.getMessage(), ise); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java index d865e0e..326ee02 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SearchServiceImpl.java @@ -76,6 +76,7 @@ public class SearchServiceImpl implements SearchService { } } + @Override public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws RestException { try { return dao.getColumnMetadata(indices); http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/resources/application-test.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application-test.yml b/metron-interface/metron-rest/src/main/resources/application-test.yml index b5e65a7..749dec4 100644 --- a/metron-interface/metron-rest/src/main/resources/application-test.yml +++ b/metron-interface/metron-rest/src/main/resources/application-test.yml @@ -51,3 +51,8 @@ index: hbase: # HBase is provided via a mock provider, so no actual HBase infrastructure is started. provider: org.apache.metron.hbase.mock.MockHBaseTableProvider + +meta: + dao: + # By default, we use the InMemoryMetaAlertDao for our tests + impl: org.apache.metron.indexing.dao.InMemoryMetaAlertDao http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/main/resources/application.yml ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/resources/application.yml b/metron-interface/metron-rest/src/main/resources/application.yml index 3aa5fd9..764bd40 100644 --- a/metron-interface/metron-rest/src/main/resources/application.yml +++ b/metron-interface/metron-rest/src/main/resources/application.yml @@ -54,3 +54,7 @@ index: # By default, we use the ElasticsearchDao and HBaseDao for backing updates. impl: org.apache.metron.elasticsearch.dao.ElasticsearchDao,org.apache.metron.indexing.dao.HBaseDao +meta: + dao: + # By default, we use the ElasticsearchMetaAlertDao + impl: org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java index 096f1be..bd3f5bd 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/DaoControllerTest.java @@ -17,10 +17,8 @@ */ package org.apache.metron.rest.controller; -import com.google.common.collect.ImmutableMap; import org.apache.metron.common.Constants; import org.apache.metron.indexing.dao.InMemoryDao; -import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; @@ -34,22 +32,20 @@ import java.util.Map; public class DaoControllerTest { public static final String TABLE = "updates"; public static final String CF = "t"; - public void loadTestData() throws ParseException { + public void loadTestData(Map<String, String> indicesToDataMap) throws ParseException { Map<String, List<String>> backingStore = new HashMap<>(); - for(Map.Entry<String, String> indices : - ImmutableMap.of( - "bro_index_2017.01.01.01", SearchIntegrationTest.broData, - "snort_index_2017.01.01.01", SearchIntegrationTest.snortData - ).entrySet() - ) + for(Map.Entry<String, String> indices : indicesToDataMap.entrySet()) { List<String> results = new ArrayList<>(); backingStore.put(indices.getKey(), results); - JSONArray broArray = (JSONArray) new JSONParser().parse(indices.getValue()); + JSONArray docArray = (JSONArray) new JSONParser().parse(indices.getValue()); int i = 0; - for(Object o: broArray) { + for(Object o: docArray) { JSONObject jsonObject = (JSONObject) o; - jsonObject.put(Constants.GUID, indices.getKey() + ":" + i++); + // Don't replace the GUID if we've already provided one + if (!jsonObject.containsKey(Constants.GUID)) { + jsonObject.put(Constants.GUID, indices.getKey() + ":" + i++); + } results.add(jsonObject.toJSONString()); } } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java new file mode 100644 index 0000000..983c207 --- /dev/null +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/MetaAlertControllerIntegrationTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.rest.controller; + +import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; +import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; +import static org.springframework.security.test.web.servlet.setup.SecurityMockMvcConfigurers.springSecurity; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.content; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + +import com.google.common.collect.ImmutableMap; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.SearchIntegrationTest; +import org.apache.metron.rest.service.MetaAlertService; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.ResultActions; +import org.springframework.test.web.servlet.setup.MockMvcBuilders; +import org.springframework.web.context.WebApplicationContext; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles(TEST_PROFILE) +public class MetaAlertControllerIntegrationTest extends DaoControllerTest { + + @Autowired + private MetaAlertService metaAlertService; + @Autowired + public CuratorFramework client; + + @Autowired + private WebApplicationContext wac; + + private MockMvc mockMvc; + + private String metaalertUrl = "/api/v1/metaalert"; + private String user = "user"; + private String password = "password"; + + /** + { + "guidToIndices" : { + "bro_1":"bro_index_2017.01.01.01", + "snort_2":"snort_index_2017.01.01.01" + }, + "groups" : ["group_one", "group_two"] + } + */ + @Multiline + public static String create; + + @Before + public void setup() throws Exception { + this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); + ImmutableMap<String, String> testData = ImmutableMap.of( + "bro_index_2017.01.01.01", SearchIntegrationTest.broData, + "snort_index_2017.01.01.01", SearchIntegrationTest.snortData, + MetaAlertDao.METAALERTS_INDEX, SearchIntegrationTest.metaAlertData + ); + loadTestData(testData); + } + + @Test + public void test() throws Exception { + // Testing searching by alert + // Test no meta alert + String guid = "missing_1"; + ResultActions result = this.mockMvc.perform( + post(metaalertUrl + "/searchByAlert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")) + .content(guid)); + result.andExpect(status().isOk()) + .andExpect( + content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(0)); + + // Test single meta alert + guid = "snort_1"; + result = this.mockMvc.perform( + post(metaalertUrl + "/searchByAlert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")) + .content(guid)); + result.andExpect(status().isOk()) + .andExpect( + content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(1)) + .andExpect(jsonPath("$.results[0].source.guid").value("meta_2")) + .andExpect(jsonPath("$.results[0].source.count").value(3.0)); + + // Test multiple meta alerts + guid = "bro_1"; + result = this.mockMvc.perform( + post(metaalertUrl + "/searchByAlert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")) + .content(guid)); + result.andExpect(status().isOk()) + .andExpect( + content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(2)) + .andExpect(jsonPath("$.results[0].source.guid").value("meta_2")) + .andExpect(jsonPath("$.results[0].source.count").value(3.0)) + .andExpect(jsonPath("$.results[1].source.guid").value("meta_1")) + .andExpect(jsonPath("$.results[1].source.count").value(1.0)); + + result = this.mockMvc.perform( + post(metaalertUrl + "/create") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("application/json;charset=UTF-8")) + .content(create)); + result.andExpect(status().isOk()); + + // Test that we can find the newly created meta alert by the sub alerts + guid = "bro_1"; + result = this.mockMvc.perform( + post(metaalertUrl + "/searchByAlert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")) + .content(guid)); + result.andExpect(status().isOk()) + .andExpect( + content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(3)) + .andExpect(jsonPath("$.results[0].source.guid").value("meta_3")) + .andExpect(jsonPath("$.results[0].source.count").value(2.0)) + .andExpect(jsonPath("$.results[1].source.guid").value("meta_2")) + .andExpect(jsonPath("$.results[1].source.count").value(3.0)) + .andExpect(jsonPath("$.results[2].source.guid").value("meta_1")) + .andExpect(jsonPath("$.results[2].source.count").value(1.0)); + + guid = "snort_2"; + result = this.mockMvc.perform( + post(metaalertUrl + "/searchByAlert") + .with(httpBasic(user, password)).with(csrf()) + .contentType(MediaType.parseMediaType("text/plain;charset=UTF-8")) + .content(guid)); + result.andExpect(status().isOk()) + .andExpect( + content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + .andExpect(jsonPath("$.total").value(1)) + .andExpect(jsonPath("$.results[0].source.guid").value("meta_3")) + .andExpect(jsonPath("$.results[0].source.count").value(2.0)); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java index 645e525..ca7f209 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SearchControllerIntegrationTest.java @@ -17,6 +17,8 @@ */ package org.apache.metron.rest.controller; +import com.google.common.collect.ImmutableMap; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.indexing.dao.InMemoryDao; import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.indexing.dao.search.FieldType; @@ -70,7 +72,11 @@ public class SearchControllerIntegrationTest extends DaoControllerTest { @Before public void setup() throws Exception { this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); - loadTestData(); + ImmutableMap<String, String> testData = ImmutableMap.of( + "bro_index_2017.01.01.01", SearchIntegrationTest.broData, + "snort_index_2017.01.01.01", SearchIntegrationTest.snortData + ); + loadTestData(testData); loadColumnTypes(); } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java index 8955980..4708bc4 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/UpdateControllerIntegrationTest.java @@ -17,12 +17,15 @@ */ package org.apache.metron.rest.controller; +import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Result; import org.apache.metron.hbase.mock.MockHTable; import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.SearchIntegrationTest; import org.apache.metron.rest.service.UpdateService; import org.junit.Assert; import org.junit.Before; @@ -71,7 +74,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { /** { - "guid" : "bro_index_2017.01.01.01:1", + "guid" : "bro_2", "sensorType" : "bro" } */ @@ -80,7 +83,7 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { /** { - "guid" : "bro_index_2017.01.01.01:1", + "guid" : "bro_2", "sensorType" : "bro", "patch" : [ { @@ -96,11 +99,11 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { /** { - "guid" : "bro_index_2017.01.01.01:1", + "guid" : "bro_2", "sensorType" : "bro", "replacement" : { "source:type": "bro", - "guid" : "bro_index_2017.01.01.01:1", + "guid" : "bro_2", "ip_src_addr":"192.168.1.2", "ip_src_port": 8009, "timestamp":200, @@ -114,12 +117,17 @@ public class UpdateControllerIntegrationTest extends DaoControllerTest { @Before public void setup() throws Exception { this.mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).apply(springSecurity()).build(); - loadTestData(); + ImmutableMap<String, String> testData = ImmutableMap.of( + "bro_index_2017.01.01.01", SearchIntegrationTest.broData, + "snort_index_2017.01.01.01", SearchIntegrationTest.snortData, + MetaAlertDao.METAALERTS_INDEX, SearchIntegrationTest.metaAlertData + ); + loadTestData(testData); } @Test public void test() throws Exception { - String guid = "bro_index_2017.01.01.01:1"; + String guid = "bro_2"; ResultActions result = this.mockMvc.perform(post(searchUrl + "/findOne").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(findMessage0)); try { result.andExpect(status().isOk()) http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 0d7a76c..0a06c80 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -45,6 +45,10 @@ import org.apache.metron.indexing.dao.search.GroupResult; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; +import org.elasticsearch.action.ActionWriteResponse.ShardInfo; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.*; +import org.elasticsearch.action.update.UpdateRequest; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; @@ -52,6 +56,7 @@ import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.client.transport.TransportClient; @@ -72,6 +77,24 @@ import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.aggregations.metrics.sum.Sum; import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.search.sort.*; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { private transient TransportClient client; @@ -105,6 +128,17 @@ public class ElasticsearchDao implements IndexDao { @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + return search(searchRequest, new QueryStringQueryBuilder(searchRequest.getQuery())); + } + + /** + * Defers to a provided {@link org.elasticsearch.index.query.QueryBuilder} for the query. + * @param searchRequest The request defining the parameters of the search + * @param queryBuilder The actual query to be run. Intended for if the SearchRequest requires wrapping + * @return The results of the query + * @throws InvalidSearchException When the query is malformed or the current state doesn't allow search + */ + protected SearchResponse search(SearchRequest searchRequest, QueryBuilder queryBuilder) throws InvalidSearchException { if(client == null) { throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); } @@ -114,10 +148,10 @@ public class ElasticsearchDao implements IndexDao { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder() .size(searchRequest.getSize()) .from(searchRequest.getFrom()) - .query(new QueryStringQueryBuilder(searchRequest.getQuery())) + .query(queryBuilder) + .trackScores(true); - searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder()))); - Optional<List<String>> fields = searchRequest.getFields(); + searchRequest.getSort().forEach(sortField -> searchSourceBuilder.sort(sortField.getField(), getElasticsearchSortOrder(sortField.getSortOrder())));Optional<List<String>> fields = searchRequest.getFields(); if (fields.isPresent()) { searchSourceBuilder.fields(fields.get()); } else { @@ -264,8 +298,19 @@ public class ElasticsearchDao implements IndexDao { .upsert(indexRequest) ; + org.elasticsearch.action.search.SearchResponse result = client.prepareSearch("test*").setFetchSource(true).setQuery(QueryBuilders.matchAllQuery()).get(); + result.getHits(); try { - client.update(updateRequest).get(); + UpdateResponse response = client.update(updateRequest).get(); + + ShardInfo shardInfo = response.getShardInfo(); + int failed = shardInfo.getFailed(); + if (failed > 0) { + throw new IOException("ElasticsearchDao upsert failed: " + Arrays.toString(shardInfo.getFailures())); + } + Thread.sleep(10000); + org.elasticsearch.action.search.SearchResponse resultAfter = client.prepareSearch("test*").setFetchSource(true).setQuery(QueryBuilders.matchAllQuery()).get(); + resultAfter.getHits(); } catch (Exception e) { throw new IOException(e.getMessage(), e); } @@ -438,6 +483,10 @@ public class ElasticsearchDao implements IndexDao { return String.format("%s_count", field); } + public TransportClient getClient() { + return client; + } + private String getGroupByAggregationName(String field) { return String.format("%s_group", field); } http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java new file mode 100644 index 0000000..cd6ed75 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java @@ -0,0 +1,446 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.elasticsearch.index.query.QueryBuilders.boolQuery; +import static org.elasticsearch.index.query.QueryBuilders.constantScoreQuery; +import static org.elasticsearch.index.query.QueryBuilders.nestedQuery; +import static org.elasticsearch.index.query.QueryBuilders.termQuery; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; +import org.apache.metron.common.Constants; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.MetaAlertDao; +import org.apache.metron.indexing.dao.MultiIndexDao; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; +import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaScores; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.InvalidCreateException; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.update.Document; +import org.elasticsearch.action.ActionWriteResponse.ShardInfo; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.get.MultiGetItemResponse; +import org.elasticsearch.action.get.MultiGetRequest.Item; +import org.elasticsearch.action.get.MultiGetRequestBuilder; +import org.elasticsearch.action.get.MultiGetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryStringQueryBuilder; +import org.elasticsearch.index.query.support.QueryInnerHitBuilder; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; + +public class ElasticsearchMetaAlertDao implements MetaAlertDao { + + private IndexDao indexDao; + private ElasticsearchDao elasticsearchDao; + private String index = METAALERTS_INDEX; + private String threatTriageField = THREAT_FIELD_DEFAULT; + private String threatSort = THREAT_SORT_DEFAULT; + + /** + * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. + * @param indexDao The Dao to wrap + */ + public ElasticsearchMetaAlertDao(IndexDao indexDao) { + this(indexDao, METAALERTS_INDEX, THREAT_FIELD_DEFAULT, THREAT_SORT_DEFAULT); + } + + /** + * Wraps an {@link org.apache.metron.indexing.dao.IndexDao} to handle meta alerts. + * @param indexDao The Dao to wrap + * @param triageLevelField The field name to use as the threat scoring field + */ + public ElasticsearchMetaAlertDao(IndexDao indexDao, String index, String triageLevelField, + String threatSort) { + init(indexDao, threatSort); + this.index = index; + this.threatTriageField = triageLevelField; + } + + public ElasticsearchMetaAlertDao() { + //uninitialized. + } + + @Override + public void init(IndexDao indexDao, String threatSort) { + if (indexDao instanceof MultiIndexDao) { + this.indexDao = indexDao; + MultiIndexDao multiIndexDao = (MultiIndexDao) indexDao; + for (IndexDao childDao : multiIndexDao.getIndices()) { + if (childDao instanceof ElasticsearchDao) { + this.elasticsearchDao = (ElasticsearchDao) childDao; + } + } + } else if (indexDao instanceof ElasticsearchDao) { + this.indexDao = indexDao; + this.elasticsearchDao = (ElasticsearchDao) indexDao; + } else { + throw new IllegalArgumentException( + "Need an ElasticsearchDao when using ElasticsearchMetaAlertDao" + ); + } + + if (threatSort != null) { + this.threatSort = threatSort; + } + } + + @Override + public void init(AccessConfig config) { + // Do nothing. We're just wrapping a child dao + } + + @Override + public SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException { + if (guid == null || guid.trim().isEmpty()) { + throw new InvalidSearchException("Guid cannot be empty"); + } + org.elasticsearch.action.search.SearchResponse esResponse = getMetaAlertsForAlert(guid.trim()); + SearchResponse searchResponse = new SearchResponse(); + searchResponse.setTotal(esResponse.getHits().getTotalHits()); + searchResponse.setResults( + Arrays.stream(esResponse.getHits().getHits()).map(searchHit -> { + SearchResult searchResult = new SearchResult(); + searchResult.setId(searchHit.getId()); + searchResult.setSource(searchHit.getSource()); + searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); + return searchResult; + } + ).collect(Collectors.toList())); + return searchResponse; + } + + @Override + public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) + throws InvalidCreateException, IOException { + if (request.getGuidToIndices().isEmpty()) { + throw new InvalidCreateException("MetaAlertCreateRequest must contain alert GUIDs"); + } + if (request.getGroups().isEmpty()) { + throw new InvalidCreateException("MetaAlertCreateRequest must contain UI groups"); + } + + // Retrieve the documents going into the meta alert + MultiGetResponse multiGetResponse = getDocumentsByGuid(request); + Document createDoc = buildCreateDocument(multiGetResponse, request.getGroups()); + + try { + handleMetaUpdate(createDoc, Optional.of(METAALERTS_INDEX)); + MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse(); + createResponse.setCreated(true); + return createResponse; + } catch (IOException ioe) { + throw new InvalidCreateException("Unable to create meta alert", ioe); + } + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + // Wrap the query to also get any meta-alerts. + QueryBuilder qb = constantScoreQuery(boolQuery() + .should(new QueryStringQueryBuilder(searchRequest.getQuery())) + .should(boolQuery() + .must(termQuery(MetaAlertDao.STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())) + .must(nestedQuery( + ALERT_FIELD, + new QueryStringQueryBuilder(searchRequest.getQuery()) + ) + ) + ) + ); + return elasticsearchDao.search(searchRequest, qb); + } + + @Override + public Document getLatest(String guid, String sensorType) throws IOException { + return indexDao.getLatest(guid, sensorType); + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + if (METAALERT_TYPE.equals(update.getSensorType())) { + // We've been passed an update to the meta alert. + handleMetaUpdate(update, index); + } else { + // We need to update an alert itself. Only that portion of the update can be delegated. + // We still need to get meta alerts potentially associated with it and update. + org.elasticsearch.action.search.SearchResponse response = getMetaAlertsForAlert( + update.getGuid() + ); + + // Each hit, if any, is a metaalert that needs to be updated + for (SearchHit hit : response.getHits()) { + handleAlertUpdate(update, hit); + } + + // Run the alert's update + indexDao.update(update, index); + } + } + + /** + * Given an alert GUID, retrieve all associated meta alerts. + * @param guid The GUID of the child alert + * @return The Elasticsearch response containing the meta alerts + */ + protected org.elasticsearch.action.search.SearchResponse getMetaAlertsForAlert(String guid) { + QueryBuilder qb = boolQuery() + .must( + nestedQuery( + ALERT_FIELD, + boolQuery() + .must(termQuery(ALERT_FIELD + "." + Constants.GUID, guid)) + ).innerHit(new QueryInnerHitBuilder()) + ) + .must(termQuery(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString())); + SearchRequest sr = new SearchRequest(); + ArrayList<String> indices = new ArrayList<>(); + indices.add(index); + sr.setIndices(indices); + return elasticsearchDao + .getClient() + .prepareSearch(index) + .addFields("*") + .setFetchSource(true) + .setQuery(qb) + .execute() + .actionGet(); + } + + /** + * Return child documents after retrieving them from Elasticsearch. + * @param request The request detailing which child alerts we need + * @return The Elasticsearch response to our request for alerts + */ + protected MultiGetResponse getDocumentsByGuid(MetaAlertCreateRequest request) { + MultiGetRequestBuilder multiGet = elasticsearchDao.getClient().prepareMultiGet(); + for (Entry<String, String> entry : request.getGuidToIndices().entrySet()) { + multiGet.add(new Item(entry.getValue(), null, entry.getKey())); + } + return multiGet.get(); + } + + /** + * Build the Document representing a meta alert to be created. + * @param multiGetResponse The Elasticsearch results for the meta alerts child documents + * @param groups The groups used to create this meta alert + * @return A Document representing the new meta alert + */ + protected Document buildCreateDocument(MultiGetResponse multiGetResponse, List<String> groups) { + // Need to create a Document from the multiget. Scores will be calculated later + Map<String, Object> metaSource = new HashMap<>(); + List<Map<String, Object>> alertList = new ArrayList<>(); + for (MultiGetItemResponse itemResponse : multiGetResponse) { + GetResponse response = itemResponse.getResponse(); + if (response.isExists()) { + alertList.add(response.getSource()); + } + } + metaSource.put(ALERT_FIELD, alertList.toArray()); + + // Add any meta fields and score calculation. + String guid = UUID.randomUUID().toString(); + metaSource.put(Constants.GUID, guid); + metaSource.put(Constants.Fields.TIMESTAMP.getName(), System.currentTimeMillis()); + metaSource.put(GROUPS_FIELD, groups.toArray()); + metaSource.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()); + + return new Document(metaSource, guid, METAALERT_TYPE, System.currentTimeMillis()); + } + + /** + * Process an update to a meta alert itself. + * @param update The update Document to be applied + * @param index The optional index to update to + * @throws IOException If there's a problem running the update + */ + protected void handleMetaUpdate(Document update, Optional<String> index) throws IOException { + // We have an update to a meta alert itself (e.g. adding a document, etc.) Calculate scores + // and defer the final result to the Elasticsearch DAO. + MetaScores metaScores = calculateMetaScores(update); + update.getDocument().putAll(metaScores.getMetaScores()); + update.getDocument().put(threatTriageField, metaScores.getMetaScores().get(threatSort)); + indexDao.update(update, index); + } + + /** + * Takes care of upserting a child alert to a meta alert. + * @param update The update Document to be applied + * @param hit The meta alert to be updated + * @throws IOException If there's an issue running the upsert + */ + protected void handleAlertUpdate(Document update, SearchHit hit) throws IOException { + XContentBuilder builder = buildUpdatedMetaAlert(update, hit); + + // Run the meta alert's update + IndexRequest indexRequest = new IndexRequest( + METAALERTS_INDEX, + METAALERT_DOC, + hit.getId() + ).source(builder); + UpdateRequest updateRequest = new UpdateRequest( + METAALERTS_INDEX, + METAALERT_DOC, + hit.getId() + ).doc(builder).upsert(indexRequest); + try { + UpdateResponse updateResponse = elasticsearchDao.getClient().update(updateRequest).get(); + + ShardInfo shardInfo = updateResponse.getShardInfo(); + int failed = shardInfo.getFailed(); + if (failed > 0) { + throw new IOException( + "ElasticsearchMetaAlertDao upsert failed: " + + Arrays.toString(shardInfo.getFailures()) + ); + } + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + } + + @Override + public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) + throws IOException { + return indexDao.getColumnMetadata(indices); + } + + @Override + public Map<String, FieldType> getCommonColumnMetadata(List<String> indices) throws + IOException { + return indexDao.getCommonColumnMetadata(indices); + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return indexDao.group(groupRequest); + } + + /** + * Calculate the meta alert scores for a Document. + * @param document The Document containing scores + * @return Set of score statistics + */ + @SuppressWarnings("unchecked") + protected MetaScores calculateMetaScores(Document document) { + List<Object> alertsRaw = ((List<Object>) document.getDocument().get(ALERT_FIELD)); + if (alertsRaw == null || alertsRaw.isEmpty()) { + throw new IllegalArgumentException("No alerts to use in calculation for doc GUID: " + + document.getDocument().get(Constants.GUID)); + } + + ArrayList<Double> scores = new ArrayList<>(); + for (Object alertRaw : alertsRaw) { + Map<String, Object> alert = (Map<String, Object>) alertRaw; + Double scoreNum = parseThreatField(alert.get(threatTriageField)); + if (scoreNum != null) { + scores.add(scoreNum); + } + } + + return new MetaScores(scores); + } + + /** + * Builds the updated meta alert based on the update. + * @param update The update Document for the meta alert + * @param hit The meta alert to be updated + * @return A builder for Elasticsearch to use + * @throws IOException If we have an issue building the result + */ + protected XContentBuilder buildUpdatedMetaAlert(Document update, SearchHit hit) + throws IOException { + // Make sure to get all the threat scores while we're going through the docs + List<Double> scores = new ArrayList<>(); + // Start building the new version of the metaalert + XContentBuilder builder = jsonBuilder().startObject(); + + // Run through the nested alerts of the meta alert and either use the new or old versions + builder.startArray(ALERT_FIELD); + Map<String, SearchHits> innerHits = hit.getInnerHits(); + + SearchHits alertHits = innerHits.get(ALERT_FIELD); + for (SearchHit alertHit : alertHits.getHits()) { + Map<String, Object> docMap; + // If we're at the update use it, otherwise use the original + if (alertHit.sourceAsMap().get(Constants.GUID).equals(update.getGuid())) { + docMap = update.getDocument(); + } else { + docMap = alertHit.getSource(); + } + builder.map(docMap); + + // Handle either String or Number values in the threatTriageField + Object threatRaw = docMap.get(threatTriageField); + Double threat = parseThreatField(threatRaw); + + if (threat != null) { + scores.add(threat); + } + } + builder.endArray(); + + // Add all the meta alert fields, and score calculation + Map<String, Object> updatedMeta = new HashMap<>(); + updatedMeta.putAll(hit.getSource()); + updatedMeta.putAll(new MetaScores(scores).getMetaScores()); + for (Entry<String, Object> entry : updatedMeta.entrySet()) { + // The alerts field is being added separately, so ignore the original + if (!(entry.getKey().equals(ALERT_FIELD))) { + builder.field(entry.getKey(), entry.getValue()); + } + } + builder.endObject(); + + return builder; + } + + private Double parseThreatField(Object threatRaw) { + Double threat = null; + if (threatRaw instanceof Number) { + threat = ((Number) threatRaw).doubleValue(); + } else if (threatRaw instanceof String) { + threat = Double.parseDouble((String) threatRaw); + } + return threat; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/40c93527/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java new file mode 100644 index 0000000..6c8e858 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/MetaAlertStatus.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.elasticsearch.dao; + +public enum MetaAlertStatus { + ACTIVE("active"), + INACTIVE("inactive"); + + private String statusString; + + MetaAlertStatus(String statusString) { + this.statusString = statusString; + } + + public String getStatusString() { + return statusString; + } +}
