Repository: metron Updated Branches: refs/heads/master 42fc699ce -> 89a2beda4
METRON-1845 Correct Test Data Load in Elasticsearch Integration Tests (nickwallen) closes apache/metron#1247 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/89a2beda Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/89a2beda Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/89a2beda Branch: refs/heads/master Commit: 89a2beda4f07911c8b3cd7dee8a2c3426838d161 Parents: 42fc699 Author: nickwallen <[email protected]> Authored: Wed Nov 28 20:10:50 2018 -0500 Committer: nickallen <[email protected]> Committed: Wed Nov 28 20:10:50 2018 -0500 ---------------------------------------------------------------------- .../elasticsearch/dao/ElasticsearchDao.java | 16 ++- .../dao/ElasticsearchUpdateDao.java | 9 ++ .../ElasticsearchIndexingIntegrationTest.java | 15 ++- .../ElasticsearchMetaAlertIntegrationTest.java | 78 +++++++------- .../ElasticsearchSearchIntegrationTest.java | 106 ++++++++----------- .../ElasticsearchUpdateIntegrationTest.java | 68 ++++++------ .../components/ElasticSearchComponent.java | 96 ++++++++++++----- .../dao/metaalert/MetaAlertIntegrationTest.java | 3 +- 8 files changed, 227 insertions(+), 164 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 210e1ce..dcd6fdb 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -39,6 +39,7 @@ import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; import org.apache.metron.indexing.dao.update.ReplaceRequest; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.index.query.QueryBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,8 +64,9 @@ public class ElasticsearchDao implements IndexDao { private ElasticsearchRequestSubmitter requestSubmitter; private AccessConfig accessConfig; + private WriteRequest.RefreshPolicy refreshPolicy; - protected ElasticsearchDao(ElasticsearchClient client, + public ElasticsearchDao(ElasticsearchClient client, AccessConfig config, ElasticsearchSearchDao searchDao, ElasticsearchUpdateDao updateDao, @@ -83,6 +85,7 @@ public class ElasticsearchDao implements IndexDao { public ElasticsearchDao() { //uninitialized. + refreshPolicy = WriteRequest.RefreshPolicy.NONE; } public AccessConfig getAccessConfig() { @@ -100,10 +103,10 @@ public class ElasticsearchDao implements IndexDao { this.accessConfig = config; this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client); this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client); - this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, - requestSubmitter); + this.searchDao = new ElasticsearchSearchDao(client, accessConfig, columnMetadataDao, requestSubmitter); this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client); - this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao); + this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, retrieveLatestDao) + .withRefreshPolicy(refreshPolicy); } if (columnMetadataDao == null) { @@ -187,6 +190,11 @@ public class ElasticsearchDao implements IndexDao { return this.updateDao.removeCommentFromAlert(request, latest); } + public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + protected Optional<String> getIndexName(String guid, String sensorType) throws IOException { return updateDao.getIndexName(guid, sensorType); } http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java index c769b2f..ba852aa 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java @@ -40,6 +40,7 @@ import org.elasticsearch.action.bulk.BulkRequest; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,6 +52,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { private transient ElasticsearchClient client; private AccessConfig accessConfig; private ElasticsearchRetrieveLatestDao retrieveLatestDao; + private WriteRequest.RefreshPolicy refreshPolicy; public ElasticsearchUpdateDao(ElasticsearchClient client, AccessConfig accessConfig, @@ -58,6 +60,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { this.client = client; this.accessConfig = accessConfig; this.retrieveLatestDao = searchDao; + this.refreshPolicy = WriteRequest.RefreshPolicy.NONE; } @Override @@ -89,6 +92,7 @@ public class ElasticsearchUpdateDao implements UpdateDao { .getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); BulkRequest bulkRequestBuilder = new BulkRequest(); + bulkRequestBuilder.setRefreshPolicy(refreshPolicy); // Get the indices we'll actually be using for each Document. for (Map.Entry<Document, Optional<String>> updateEntry : updates.entrySet()) { @@ -182,6 +186,11 @@ public class ElasticsearchUpdateDao implements UpdateDao { return update(newVersion, Optional.empty()); } + public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy refreshPolicy) { + this.refreshPolicy = refreshPolicy; + return this; + } + protected String getIndexName(Document update, Optional<String> index, String indexPostFix) throws IOException { return index.orElse(getIndexName(update.getGuid(), update.getSensorType()) .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), indexPostFix, null)) http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java index df5e96a..2784db3 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java @@ -18,10 +18,10 @@ package org.apache.metron.elasticsearch.integration; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.field.DeDotFieldNameConverter; import org.apache.metron.common.field.FieldNameConverter; import org.apache.metron.common.field.FieldNameConverters; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; +import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.integration.IndexingIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.InMemoryComponent; @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -67,10 +68,22 @@ public class ElasticsearchIndexingIntegrationTest extends IndexingIntegrationTes @Override public InMemoryComponent getSearchComponent(final Properties topologyProperties) { + Map<String, Object> globalConfig = new HashMap<String, Object>() { + { + put("es.clustername", "metron"); + put("es.port", "9200"); + put("es.ip", "localhost"); + put("es.date.format", dateFormat); + } + }; + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setGlobalConfigSupplier(() -> globalConfig); + return new ElasticSearchComponent.Builder() .withHttpPort(9211) .withIndexDir(new File(indexDir)) .withMapping(index, "yaf_doc", mapping) + .withAccessConfig(accessConfig) .build(); } http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java index 03b1639..cba0f65 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java @@ -19,28 +19,7 @@ package org.apache.metron.elasticsearch.integration; -import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; -import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; - import com.fasterxml.jackson.core.JsonProcessingException; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; - import com.google.common.collect.ImmutableList; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.Constants; @@ -50,13 +29,13 @@ import org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; import org.apache.metron.indexing.dao.AccessConfig; import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.metaalert.MetaAlertDao; import org.apache.metron.indexing.dao.metaalert.MetaAlertIntegrationTest; import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SortField; +import org.json.simple.parser.ParseException; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -66,11 +45,33 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD; +import static org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE; + @RunWith(Parameterized.class) public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationTest { private static IndexDao esDao; private static ElasticSearchComponent es; + private static AccessConfig accessConfig; protected static final String INDEX_DIR = "target/elasticsearch_meta"; private static String POSTFIX= new SimpleDateFormat(DATE_FORMAT).format(new Date()); @@ -127,10 +128,25 @@ public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationT // Ensure ES can retry as needed. MAX_RETRIES = 10; - // setup the client + Map<String, Object> globalConfig = new HashMap<String, Object>() { + { + put("es.clustername", "metron"); + put("es.port", "9200"); + put("es.ip", "localhost"); + put("es.date.format", DATE_FORMAT); + } + }; + + accessConfig = new AccessConfig(); + accessConfig.setMaxSearchResults(1000); + accessConfig.setMaxSearchGroups(100); + accessConfig.setGlobalConfigSupplier(() -> globalConfig); + + // start elasticsearch es = new ElasticSearchComponent.Builder() .withHttpPort(9211) .withIndexDir(new File(INDEX_DIR)) + .withAccessConfig(accessConfig) .build(); es.start(); } @@ -140,21 +156,9 @@ public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationT es.createIndexWithMapping(METAALERTS_INDEX, METAALERT_DOC, template.replace("%MAPPING_NAME%", METAALERT_TYPE)); es.createIndexWithMapping(INDEX, "test_doc", template.replace("%MAPPING_NAME%", "test")); - AccessConfig accessConfig = new AccessConfig(); - Map<String, Object> globalConfig = new HashMap<String, Object>() { - { - put("es.clustername", "metron"); - put("es.port", "9200"); - put("es.ip", "localhost"); - put("es.date.format", DATE_FORMAT); - } - }; - accessConfig.setMaxSearchResults(1000); - accessConfig.setGlobalConfigSupplier(() -> globalConfig); - accessConfig.setMaxSearchGroups(100); - esDao = new ElasticsearchDao(); esDao.init(accessConfig); + ElasticsearchMetaAlertDao elasticsearchMetaDao = new ElasticsearchMetaAlertDao(esDao); elasticsearchMetaDao.setPageSize(5); metaDao = elasticsearchMetaDao; @@ -321,7 +325,7 @@ public class ElasticsearchMetaAlertIntegrationTest extends MetaAlertIntegrationT @Override protected void addRecords(List<Map<String, Object>> inputData, String index, String docType) - throws IOException { + throws IOException, ParseException { es.add(index, docType, inputData.stream().map(m -> { try { return JSONUtils.INSTANCE.toJSON(m, true); http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java index d03da0e..7036078 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -18,17 +18,6 @@ package org.apache.metron.elasticsearch.integration; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertThat; - -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.http.HttpEntity; import org.apache.http.entity.ContentType; import org.apache.http.nio.entity.NStringEntity; @@ -48,23 +37,33 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.integration.InMemoryComponent; -import org.elasticsearch.action.bulk.BulkRequest; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.client.Response; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.json.simple.JSONArray; import org.json.simple.JSONObject; import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.junit.Assert.assertThat; public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static String indexDir = "target/elasticsearch_search"; private static String dateFormat = "yyyy.MM.dd.HH"; private static String broTemplatePath = "../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template"; @@ -72,48 +71,51 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { protected static final String BRO_INDEX = "bro_index_2017.01.01.01"; protected static final String SNORT_INDEX = "snort_index_2017.01.01.02"; protected static Map<String, Object> globalConfig; + protected static AccessConfig accessConfig; protected static RestClient lowLevelClient; protected static RestHighLevelClient highLevelClient; protected static IndexDao dao; @BeforeClass public static void setup() throws Exception { - indexComponent = startIndex(); - globalConfig = new HashMap<String, Object>() {{ + globalConfig = new HashMap<String, Object>() {{ put("es.clustername", "metron"); put("es.port", "9200"); put("es.ip", "localhost"); put("es.date.format", dateFormat); }}; - ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig); - lowLevelClient = esClient.getLowLevelClient(); - highLevelClient = esClient.getHighLevelClient(); - dao = createDao(globalConfig); - // The data is all static for searches, so we can set it up beforehand, and it's faster - loadTestData(); - } - protected static IndexDao createDao(Map<String, Object> globalConfig) { - AccessConfig accessConfig = new AccessConfig(); + accessConfig = new AccessConfig(); accessConfig.setMaxSearchResults(100); accessConfig.setMaxSearchGroups(100); accessConfig.setGlobalConfigSupplier(() -> globalConfig); - IndexDao dao = new ElasticsearchDao(); + indexComponent = startIndex(); + + ElasticsearchClient esClient = ElasticsearchClientFactory.create(globalConfig); + lowLevelClient = esClient.getLowLevelClient(); + highLevelClient = esClient.getHighLevelClient(); + + dao = new ElasticsearchDao(); dao.init(accessConfig); - return dao; + + // The data is all static for searches, so we can set it up beforehand, and it's faster + loadTestData(); } protected static InMemoryComponent startIndex() throws Exception { InMemoryComponent es = new ElasticSearchComponent.Builder() .withHttpPort(9211) .withIndexDir(new File(indexDir)) + .withAccessConfig(accessConfig) .build(); es.start(); return es; } - protected static void loadTestData() throws ParseException, IOException { + protected static void loadTestData() throws Exception { + ElasticSearchComponent es = (ElasticSearchComponent) indexComponent; + // add bro template JSONObject broTemplate = JSONUtils.INSTANCE.load(new File(broTemplatePath), JSONObject.class); addTestFieldMappings(broTemplate, "bro_doc"); @@ -121,6 +123,7 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { HttpEntity broEntity = new NStringEntity(broTemplateJson, ContentType.APPLICATION_JSON); Response response = lowLevelClient.performRequest("PUT", "/_template/bro_template", Collections.emptyMap(), broEntity); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // add snort template JSONObject snortTemplate = JSONUtils.INSTANCE.load(new File(snortTemplatePath), JSONObject.class); addTestFieldMappings(snortTemplate, "snort_doc"); @@ -128,42 +131,28 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { HttpEntity snortEntity = new NStringEntity(snortTemplateJson, ContentType.APPLICATION_JSON); response = lowLevelClient.performRequest("PUT", "/_template/snort_template", Collections.emptyMap(), snortEntity); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // create bro index response = lowLevelClient.performRequest("PUT", BRO_INDEX); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); + // create snort index response = lowLevelClient.performRequest("PUT", SNORT_INDEX); assertThat(response.getStatusLine().getStatusCode(), equalTo(200)); - JSONArray broRecords = (JSONArray) new JSONParser().parse(broData); - - BulkRequest bulkRequest = new BulkRequest(); - for (Object o : broRecords) { - JSONObject json = (JSONObject) o; - IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", (String) json.get("guid")); - indexRequest.source(json); - indexRequest.timestamp(json.get("timestamp").toString()); - bulkRequest.add(indexRequest); + // write the test documents for Bro + List<String> broDocuments = new ArrayList<>(); + for (Object broObject: (JSONArray) new JSONParser().parse(broData)) { + broDocuments.add(((JSONObject) broObject).toJSONString()); } - bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest); - assertFalse(bulkResponse.hasFailures()); - assertThat(bulkResponse.status().getStatus(), equalTo(200)); - - JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData); - - bulkRequest = new BulkRequest(); - for (Object o : snortRecords) { - JSONObject json = (JSONObject) o; - IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", (String) json.get("guid")); - indexRequest.source(json); - indexRequest.timestamp(json.get("timestamp").toString()); - bulkRequest.add(indexRequest); + es.add(BRO_INDEX, "bro", broDocuments); + + // write the test documents for Snort + List<String> snortDocuments = new ArrayList<>(); + for (Object snortObject: (JSONArray) new JSONParser().parse(snortData)) { + snortDocuments.add(((JSONObject) snortObject).toJSONString()); } - bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); - bulkResponse = highLevelClient.bulk(bulkRequest); - assertFalse(bulkResponse.hasFailures()); - assertThat(bulkResponse.status().getStatus(), equalTo(200)); + es.add(SNORT_INDEX, "snort", snortDocuments); } /** @@ -201,7 +190,6 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { public void returns_column_metadata_for_specified_indices() throws Exception { // getColumnMetadata with only bro { - Assert.assertEquals(262, dao.getColumnMetadata(Collections.singletonList("bro")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); Assert.assertEquals(262, fieldTypes.size()); Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method")); @@ -220,7 +208,6 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { } // getColumnMetadata with only snort { - Assert.assertEquals(32, dao.getColumnMetadata(Collections.singletonList("snort")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); Assert.assertEquals(32, fieldTypes.size()); Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator")); @@ -242,7 +229,6 @@ public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { @Override public void returns_column_data_for_multiple_indices() throws Exception { - Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", "snort")).size()); Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); Assert.assertEquals(277, fieldTypes.size()); http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index 6f36790..6489206 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -19,13 +19,6 @@ package org.apache.metron.elasticsearch.integration; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; -import java.io.File; -import java.io.IOException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.metron.common.utils.JSONUtils; @@ -44,6 +37,14 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; +import java.io.File; +import java.io.IOException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static final String SENSOR_NAME= "test"; private static String indexDir = "target/elasticsearch_mutation"; @@ -55,6 +56,9 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { private static final String CF = "p"; private static MockHTable table; private static IndexDao hbaseDao; + private static IndexDao elasticsearchDao; + private static AccessConfig accessConfig; + private static Map<String, Object> globalConfig; @Override protected String getIndexName() { @@ -62,30 +66,37 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { } @BeforeClass - public static void setupBeforeClass() throws UnableToStartException { - es = new ElasticSearchComponent.Builder() - .withHttpPort(9211) - .withIndexDir(new File(indexDir)) - .build(); - es.start(); - } - - @Before - public void setup() throws IOException { + public static void setupBeforeClass() throws UnableToStartException, IOException { Configuration config = HBaseConfiguration.create(); MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); MockHBaseTableProvider.addToCache(TABLE_NAME, CF); table = (MockHTable) tableProvider.getTable(config, TABLE_NAME); - hbaseDao = new HBaseDao(); - AccessConfig accessConfig = new AccessConfig(); - accessConfig.setTableProvider(tableProvider); - Map<String, Object> globalConfig = createGlobalConfig(); + globalConfig = new HashMap<>(); + globalConfig.put("es.clustername", "metron"); + globalConfig.put("es.port", "9200"); + globalConfig.put("es.ip", "localhost"); + globalConfig.put("es.date.format", dateFormat); globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME); globalConfig.put(HBaseDao.HBASE_CF, CF); + + accessConfig = new AccessConfig(); + accessConfig.setTableProvider(tableProvider); accessConfig.setGlobalConfigSupplier(() -> globalConfig); - MultiIndexDao dao = new MultiIndexDao(hbaseDao, createDao()); + es = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(indexDir)) + .withAccessConfig(accessConfig) + .build(); + es.start(); + } + + @Before + public void setup() { + hbaseDao = new HBaseDao(); + elasticsearchDao = new ElasticsearchDao(); + MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao); dao.init(accessConfig); setDao(dao); } @@ -101,19 +112,6 @@ public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { es.stop(); } - protected static Map<String, Object> createGlobalConfig() { - return new HashMap<String, Object>() {{ - put("es.clustername", "metron"); - put("es.port", "9200"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); - }}; - } - - protected static IndexDao createDao() { - return new ElasticsearchDao(); - } - @Override protected void addTestData(String indexName, String sensorType, List<Map<String, Object>> docs) throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java index 3e14c00..227f5ef 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java @@ -24,15 +24,36 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.BooleanSupplier; + +import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.io.FileUtils; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.elasticsearch.client.ElasticsearchClient; +import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory; +import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao; +import org.apache.metron.elasticsearch.dao.ElasticsearchDao; +import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter; +import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao; +import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao; +import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.UpdateDao; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; +import org.apache.metron.stellar.common.utils.ConversionUtils; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; @@ -45,6 +66,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.client.Client; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.common.settings.Settings; @@ -54,6 +76,10 @@ import org.elasticsearch.node.NodeValidationException; import org.elasticsearch.plugins.Plugin; import org.elasticsearch.search.SearchHit; import org.elasticsearch.transport.Netty4Plugin; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; public class ElasticSearchComponent implements InMemoryComponent { @@ -75,6 +101,7 @@ public class ElasticSearchComponent implements InMemoryComponent { private File indexDir; private Map<String, String> extraElasticSearchSettings = null; private List<Mapping> mappings = new ArrayList<>(); + private AccessConfig accessConfig = new AccessConfig(); public Builder withMapping(String index, String docType, String mapping) { mappings.add(new Mapping(index, docType, mapping)); @@ -97,8 +124,13 @@ public class ElasticSearchComponent implements InMemoryComponent { return this; } + public Builder withAccessConfig(AccessConfig accessConfig) { + this.accessConfig = accessConfig; + return this; + } + public ElasticSearchComponent build() { - return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings, mappings); + return new ElasticSearchComponent(httpPort, indexDir, extraElasticSearchSettings, mappings, accessConfig); } } @@ -109,13 +141,16 @@ public class ElasticSearchComponent implements InMemoryComponent { private File indexDir; private Map<String, String> extraElasticSearchSettings; private List<Mapping> mappings; + private AccessConfig accessConfig; + private IndexDao indexDao; public ElasticSearchComponent(int httpPort, File indexDir, - Map<String, String> extraElasticSearchSettings, List<Mapping> mappings) { + Map<String, String> extraElasticSearchSettings, List<Mapping> mappings, AccessConfig accessConfig) { this.httpPort = httpPort; this.indexDir = indexDir; this.extraElasticSearchSettings = extraElasticSearchSettings; this.mappings = mappings; + this.accessConfig = accessConfig; } @Override @@ -153,6 +188,10 @@ public class ElasticSearchComponent implements InMemoryComponent { client.admin().indices().prepareCreate(m.index) .addMapping(m.docType, m.mapping).get(); } + + indexDao = new ElasticsearchDao() + .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + indexDao.init(accessConfig); } private void cleanDir(File dir) throws IOException { @@ -194,35 +233,41 @@ public class ElasticSearchComponent implements InMemoryComponent { return client; } - public BulkResponse add(String indexName, String sensorType, String... docs) throws IOException { + public void add(String indexName, String sensorType, String... docs) + throws IOException, ParseException { List<String> d = new ArrayList<>(); Collections.addAll(d, docs); - return add(indexName, sensorType, d); + add(indexName, sensorType, d); } - public BulkResponse add(String indexName, String sensorType, Iterable<String> docs) - throws IOException { - BulkRequestBuilder bulkRequest = getClient().prepareBulk(); - for (String doc : docs) { - IndexRequestBuilder indexRequestBuilder = getClient() - .prepareIndex(indexName, sensorType + "_doc"); - - indexRequestBuilder = indexRequestBuilder.setSource(doc); - Map<String, Object> esDoc = JSONUtils.INSTANCE - .load(doc, JSONUtils.MAP_SUPPLIER); - indexRequestBuilder.setId((String) esDoc.get(Constants.GUID)); - Object ts = esDoc.get("timestamp"); - if (ts != null) { - indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString()); - } - bulkRequest.add(indexRequestBuilder); - } + public void add(String indexName, String sensorType, Iterable<String> docs) + throws IOException, ParseException { - BulkResponse response = bulkRequest.execute().actionGet(); - if (response.hasFailures()) { - throw new IOException(response.buildFailureMessage()); + // create a collection of indexable documents + JSONParser parser = new JSONParser(); + Map<Document, Optional<String>> documents = new HashMap<>(); + for(String json: docs) { + JSONObject message = (JSONObject) parser.parse(json); + documents.put(createDocument(message, sensorType), Optional.of(indexName)); } - return response; + + // write the documents + indexDao.batchUpdate(documents); + } + + /** + * Create an indexable Document from a JSON message. + * + * @param message The JSON message that needs indexed. + * @param docType The document type to write. + * @return The {@link Document} that was written. + * @throws IOException + */ + private static Document createDocument(JSONObject message, String docType) throws IOException { + Long timestamp = ConversionUtils.convert(message.get("timestamp"), Long.class); + String source = message.toJSONString(); + String guid = (String) message.get("guid"); + return new Document(source, guid, docType, timestamp); } public void createIndexWithMapping(String indexName, String mappingType, String mappingSource) @@ -246,7 +291,6 @@ public class ElasticSearchComponent implements InMemoryComponent { getClient().admin().indices().refresh(new RefreshRequest()); SearchResponse response = getClient().prepareSearch(index) .setTypes(sourceType) -// .setSource("message") ?? .setFrom(0) .setSize(1000) .execute().actionGet(); http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java index 24989b4..90bee80 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java @@ -55,6 +55,7 @@ import org.apache.metron.indexing.dao.search.SortOrder; import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; +import org.json.simple.parser.ParseException; import org.junit.Assert; import org.junit.Test; @@ -1090,7 +1091,7 @@ public abstract class MetaAlertIntegrationTest { throws IOException, InterruptedException; protected abstract void addRecords(List<Map<String, Object>> inputData, String index, - String docType) throws IOException; + String docType) throws IOException, ParseException; protected abstract long getMatchingMetaAlertCount(String fieldName, String fieldValue) throws IOException, InterruptedException;
