Repository: metron
Updated Branches:
  refs/heads/master 42fc699ce -> 89a2beda4


METRON-1845 Correct Test Data Load in Elasticsearch Integration Tests 
(nickwallen) closes apache/metron#1247


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/89a2beda
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/89a2beda
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/89a2beda

Branch: refs/heads/master
Commit: 89a2beda4f07911c8b3cd7dee8a2c3426838d161
Parents: 42fc699
Author: nickwallen <[email protected]>
Authored: Wed Nov 28 20:10:50 2018 -0500
Committer: nickallen <[email protected]>
Committed: Wed Nov 28 20:10:50 2018 -0500

----------------------------------------------------------------------
 .../elasticsearch/dao/ElasticsearchDao.java     |  16 ++-
 .../dao/ElasticsearchUpdateDao.java             |   9 ++
 .../ElasticsearchIndexingIntegrationTest.java   |  15 ++-
 .../ElasticsearchMetaAlertIntegrationTest.java  |  78 +++++++-------
 .../ElasticsearchSearchIntegrationTest.java     | 106 ++++++++-----------
 .../ElasticsearchUpdateIntegrationTest.java     |  68 ++++++------
 .../components/ElasticSearchComponent.java      |  96 ++++++++++++-----
 .../dao/metaalert/MetaAlertIntegrationTest.java |   3 +-
 8 files changed, 227 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 210e1ce..dcd6fdb 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -39,6 +39,7 @@ import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,8 +64,9 @@ public class ElasticsearchDao implements IndexDao {
   private ElasticsearchRequestSubmitter requestSubmitter;
 
   private AccessConfig accessConfig;
+  private WriteRequest.RefreshPolicy refreshPolicy;
 
-  protected ElasticsearchDao(ElasticsearchClient client,
+  public ElasticsearchDao(ElasticsearchClient client,
       AccessConfig config,
       ElasticsearchSearchDao searchDao,
       ElasticsearchUpdateDao updateDao,
@@ -83,6 +85,7 @@ public class ElasticsearchDao implements IndexDao {
 
   public ElasticsearchDao() {
     //uninitialized.
+    refreshPolicy = WriteRequest.RefreshPolicy.NONE;
   }
 
   public AccessConfig getAccessConfig() {
@@ -100,10 +103,10 @@ public class ElasticsearchDao implements IndexDao {
       this.accessConfig = config;
       this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client);
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);
-      this.searchDao = new ElasticsearchSearchDao(client, accessConfig, 
columnMetadataDao,
-          requestSubmitter);
+      this.searchDao = new ElasticsearchSearchDao(client, accessConfig, 
columnMetadataDao, requestSubmitter);
       this.retrieveLatestDao = new ElasticsearchRetrieveLatestDao(client);
-      this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, 
retrieveLatestDao);
+      this.updateDao = new ElasticsearchUpdateDao(client, accessConfig, 
retrieveLatestDao)
+              .withRefreshPolicy(refreshPolicy);
     }
 
     if (columnMetadataDao == null) {
@@ -187,6 +190,11 @@ public class ElasticsearchDao implements IndexDao {
     return this.updateDao.removeCommentFromAlert(request, latest);
   }
 
+  public ElasticsearchDao withRefreshPolicy(WriteRequest.RefreshPolicy 
refreshPolicy) {
+    this.refreshPolicy = refreshPolicy;
+    return this;
+  }
+
   protected Optional<String> getIndexName(String guid, String sensorType) 
throws IOException {
     return updateDao.getIndexName(guid, sensorType);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
index c769b2f..ba852aa 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchUpdateDao.java
@@ -40,6 +40,7 @@ import org.elasticsearch.action.bulk.BulkRequest;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
+import org.elasticsearch.action.support.WriteRequest;
 import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,6 +52,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
   private transient ElasticsearchClient client;
   private AccessConfig accessConfig;
   private ElasticsearchRetrieveLatestDao retrieveLatestDao;
+  private WriteRequest.RefreshPolicy refreshPolicy;
 
   public ElasticsearchUpdateDao(ElasticsearchClient client,
       AccessConfig accessConfig,
@@ -58,6 +60,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
     this.client = client;
     this.accessConfig = accessConfig;
     this.retrieveLatestDao = searchDao;
+    this.refreshPolicy = WriteRequest.RefreshPolicy.NONE;
   }
 
   @Override
@@ -89,6 +92,7 @@ public class ElasticsearchUpdateDao implements UpdateDao {
         
.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new 
Date());
 
     BulkRequest bulkRequestBuilder = new BulkRequest();
+    bulkRequestBuilder.setRefreshPolicy(refreshPolicy);
 
     // Get the indices we'll actually be using for each Document.
     for (Map.Entry<Document, Optional<String>> updateEntry : 
updates.entrySet()) {
@@ -182,6 +186,11 @@ public class ElasticsearchUpdateDao implements UpdateDao {
     return update(newVersion, Optional.empty());
   }
 
+  public ElasticsearchUpdateDao withRefreshPolicy(WriteRequest.RefreshPolicy 
refreshPolicy) {
+    this.refreshPolicy = refreshPolicy;
+    return this;
+  }
+
   protected String getIndexName(Document update, Optional<String> index, 
String indexPostFix) throws IOException {
     return index.orElse(getIndexName(update.getGuid(), update.getSensorType())
         .orElse(ElasticsearchUtils.getIndexName(update.getSensorType(), 
indexPostFix, null))

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
index df5e96a..2784db3 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchIndexingIntegrationTest.java
@@ -18,10 +18,10 @@
 package org.apache.metron.elasticsearch.integration;
 
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.field.DeDotFieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverter;
 import org.apache.metron.common.field.FieldNameConverters;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.integration.IndexingIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.InMemoryComponent;
@@ -34,6 +34,7 @@ import java.io.File;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -67,10 +68,22 @@ public class ElasticsearchIndexingIntegrationTest extends 
IndexingIntegrationTes
 
   @Override
   public InMemoryComponent getSearchComponent(final Properties 
topologyProperties) {
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {
+      {
+        put("es.clustername", "metron");
+        put("es.port", "9200");
+        put("es.ip", "localhost");
+        put("es.date.format", dateFormat);
+      }
+    };
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+
     return new ElasticSearchComponent.Builder()
             .withHttpPort(9211)
             .withIndexDir(new File(indexDir))
             .withMapping(index, "yaf_doc", mapping)
+            .withAccessConfig(accessConfig)
             .build();
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
index 03b1639..cba0f65 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchMetaAlertIntegrationTest.java
@@ -19,28 +19,7 @@
 
 package org.apache.metron.elasticsearch.integration;
 
-import static 
org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX;
-import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD;
-import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC;
-import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD;
-import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE;
-
 import com.fasterxml.jackson.core.JsonProcessingException;
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
 import com.google.common.collect.ImmutableList;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.Constants;
@@ -50,13 +29,13 @@ import 
org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.metaalert.MetaAlertDao;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertIntegrationTest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SortField;
+import org.json.simple.parser.ParseException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -66,11 +45,33 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.metron.elasticsearch.dao.ElasticsearchMetaAlertDao.METAALERTS_INDEX;
+import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.ALERT_FIELD;
+import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_DOC;
+import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_FIELD;
+import static 
org.apache.metron.indexing.dao.metaalert.MetaAlertConstants.METAALERT_TYPE;
+
 @RunWith(Parameterized.class)
 public class ElasticsearchMetaAlertIntegrationTest extends 
MetaAlertIntegrationTest {
 
   private static IndexDao esDao;
   private static ElasticSearchComponent es;
+  private static AccessConfig accessConfig;
 
   protected static final String INDEX_DIR = "target/elasticsearch_meta";
   private static String POSTFIX= new SimpleDateFormat(DATE_FORMAT).format(new 
Date());
@@ -127,10 +128,25 @@ public class ElasticsearchMetaAlertIntegrationTest 
extends MetaAlertIntegrationT
     // Ensure ES can retry as needed.
     MAX_RETRIES = 10;
 
-    // setup the client
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {
+      {
+        put("es.clustername", "metron");
+        put("es.port", "9200");
+        put("es.ip", "localhost");
+        put("es.date.format", DATE_FORMAT);
+      }
+    };
+
+    accessConfig = new AccessConfig();
+    accessConfig.setMaxSearchResults(1000);
+    accessConfig.setMaxSearchGroups(100);
+    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+
+    // start elasticsearch
     es = new ElasticSearchComponent.Builder()
             .withHttpPort(9211)
             .withIndexDir(new File(INDEX_DIR))
+            .withAccessConfig(accessConfig)
             .build();
     es.start();
   }
@@ -140,21 +156,9 @@ public class ElasticsearchMetaAlertIntegrationTest extends 
MetaAlertIntegrationT
     es.createIndexWithMapping(METAALERTS_INDEX, METAALERT_DOC, 
template.replace("%MAPPING_NAME%", METAALERT_TYPE));
     es.createIndexWithMapping(INDEX, "test_doc", 
template.replace("%MAPPING_NAME%", "test"));
 
-    AccessConfig accessConfig = new AccessConfig();
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {
-      {
-        put("es.clustername", "metron");
-        put("es.port", "9200");
-        put("es.ip", "localhost");
-        put("es.date.format", DATE_FORMAT);
-      }
-    };
-    accessConfig.setMaxSearchResults(1000);
-    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
-    accessConfig.setMaxSearchGroups(100);
-
     esDao = new ElasticsearchDao();
     esDao.init(accessConfig);
+
     ElasticsearchMetaAlertDao elasticsearchMetaDao = new 
ElasticsearchMetaAlertDao(esDao);
     elasticsearchMetaDao.setPageSize(5);
     metaDao = elasticsearchMetaDao;
@@ -321,7 +325,7 @@ public class ElasticsearchMetaAlertIntegrationTest extends 
MetaAlertIntegrationT
 
   @Override
   protected void addRecords(List<Map<String, Object>> inputData, String index, 
String docType)
-          throws IOException {
+          throws IOException, ParseException {
     es.add(index, docType, inputData.stream().map(m -> {
               try {
                 return JSONUtils.INSTANCE.toJSON(m, true);

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
index d03da0e..7036078 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -18,17 +18,6 @@
 package org.apache.metron.elasticsearch.integration;
 
 
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.http.HttpEntity;
 import org.apache.http.entity.ContentType;
 import org.apache.http.nio.entity.NStringEntity;
@@ -48,23 +37,33 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.integration.InMemoryComponent;
-import org.elasticsearch.action.bulk.BulkRequest;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequest;
-import org.elasticsearch.action.support.WriteRequest.RefreshPolicy;
 import org.elasticsearch.client.Response;
 import org.elasticsearch.client.RestClient;
 import org.elasticsearch.client.RestHighLevelClient;
 import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
 
 public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
 
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
   private static String indexDir = "target/elasticsearch_search";
   private static String dateFormat = "yyyy.MM.dd.HH";
   private static String broTemplatePath = 
"../../metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/files/bro_index.template";
@@ -72,48 +71,51 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
   protected static final String BRO_INDEX = "bro_index_2017.01.01.01";
   protected static final String SNORT_INDEX = "snort_index_2017.01.01.02";
   protected static Map<String, Object> globalConfig;
+  protected static AccessConfig accessConfig;
   protected static RestClient lowLevelClient;
   protected static RestHighLevelClient highLevelClient;
   protected static IndexDao dao;
 
   @BeforeClass
   public static void setup() throws Exception {
-    indexComponent = startIndex();
-    globalConfig = new HashMap<String, Object>() {{
+    globalConfig =  new HashMap<String, Object>() {{
       put("es.clustername", "metron");
       put("es.port", "9200");
       put("es.ip", "localhost");
       put("es.date.format", dateFormat);
     }};
-    ElasticsearchClient esClient = 
ElasticsearchClientFactory.create(globalConfig);
-    lowLevelClient = esClient.getLowLevelClient();
-    highLevelClient = esClient.getHighLevelClient();
-    dao = createDao(globalConfig);
-    // The data is all static for searches, so we can set it up beforehand, 
and it's faster
-    loadTestData();
-  }
 
-  protected static IndexDao createDao(Map<String, Object> globalConfig) {
-    AccessConfig accessConfig = new AccessConfig();
+    accessConfig = new AccessConfig();
     accessConfig.setMaxSearchResults(100);
     accessConfig.setMaxSearchGroups(100);
     accessConfig.setGlobalConfigSupplier(() -> globalConfig);
 
-    IndexDao dao = new ElasticsearchDao();
+    indexComponent = startIndex();
+
+    ElasticsearchClient esClient = 
ElasticsearchClientFactory.create(globalConfig);
+    lowLevelClient = esClient.getLowLevelClient();
+    highLevelClient = esClient.getHighLevelClient();
+
+    dao = new ElasticsearchDao();
     dao.init(accessConfig);
-    return dao;
+
+    // The data is all static for searches, so we can set it up beforehand, 
and it's faster
+    loadTestData();
   }
 
   protected static InMemoryComponent startIndex() throws Exception {
     InMemoryComponent es = new ElasticSearchComponent.Builder()
             .withHttpPort(9211)
             .withIndexDir(new File(indexDir))
+            .withAccessConfig(accessConfig)
             .build();
     es.start();
     return es;
   }
 
-  protected static void loadTestData() throws ParseException, IOException {
+  protected static void loadTestData() throws Exception {
+    ElasticSearchComponent es = (ElasticSearchComponent) indexComponent;
+
     // add bro template
     JSONObject broTemplate = JSONUtils.INSTANCE.load(new 
File(broTemplatePath), JSONObject.class);
     addTestFieldMappings(broTemplate, "bro_doc");
@@ -121,6 +123,7 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     HttpEntity broEntity = new NStringEntity(broTemplateJson, 
ContentType.APPLICATION_JSON);
     Response response = lowLevelClient.performRequest("PUT", 
"/_template/bro_template", Collections.emptyMap(), broEntity);
     assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+
     // add snort template
     JSONObject snortTemplate = JSONUtils.INSTANCE.load(new 
File(snortTemplatePath), JSONObject.class);
     addTestFieldMappings(snortTemplate, "snort_doc");
@@ -128,42 +131,28 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     HttpEntity snortEntity = new NStringEntity(snortTemplateJson, 
ContentType.APPLICATION_JSON);
     response = lowLevelClient.performRequest("PUT", 
"/_template/snort_template", Collections.emptyMap(), snortEntity);
     assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+
     // create bro index
     response = lowLevelClient.performRequest("PUT", BRO_INDEX);
     assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
+
     // create snort index
     response = lowLevelClient.performRequest("PUT", SNORT_INDEX);
     assertThat(response.getStatusLine().getStatusCode(), equalTo(200));
 
-    JSONArray broRecords = (JSONArray) new JSONParser().parse(broData);
-
-    BulkRequest bulkRequest = new BulkRequest();
-    for (Object o : broRecords) {
-      JSONObject json = (JSONObject) o;
-      IndexRequest indexRequest = new IndexRequest(BRO_INDEX, "bro_doc", 
(String) json.get("guid"));
-      indexRequest.source(json);
-      indexRequest.timestamp(json.get("timestamp").toString());
-      bulkRequest.add(indexRequest);
+    // write the test documents for Bro
+    List<String> broDocuments = new ArrayList<>();
+    for (Object broObject: (JSONArray) new JSONParser().parse(broData)) {
+      broDocuments.add(((JSONObject) broObject).toJSONString());
     }
-    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
-    BulkResponse bulkResponse = highLevelClient.bulk(bulkRequest);
-    assertFalse(bulkResponse.hasFailures());
-    assertThat(bulkResponse.status().getStatus(), equalTo(200));
-
-    JSONArray snortRecords = (JSONArray) new JSONParser().parse(snortData);
-
-    bulkRequest = new BulkRequest();
-    for (Object o : snortRecords) {
-      JSONObject json = (JSONObject) o;
-      IndexRequest indexRequest = new IndexRequest(SNORT_INDEX, "snort_doc", 
(String) json.get("guid"));
-      indexRequest.source(json);
-      indexRequest.timestamp(json.get("timestamp").toString());
-      bulkRequest.add(indexRequest);
+    es.add(BRO_INDEX, "bro", broDocuments);
+
+    // write the test documents for Snort
+    List<String> snortDocuments = new ArrayList<>();
+    for (Object snortObject: (JSONArray) new JSONParser().parse(snortData)) {
+      snortDocuments.add(((JSONObject) snortObject).toJSONString());
     }
-    bulkRequest.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL);
-    bulkResponse = highLevelClient.bulk(bulkRequest);
-    assertFalse(bulkResponse.hasFailures());
-    assertThat(bulkResponse.status().getStatus(), equalTo(200));
+    es.add(SNORT_INDEX, "snort", snortDocuments);
   }
 
   /**
@@ -201,7 +190,6 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
   public void returns_column_metadata_for_specified_indices() throws Exception 
{
     // getColumnMetadata with only bro
     {
-      Assert.assertEquals(262, 
dao.getColumnMetadata(Collections.singletonList("bro")).size());
       Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
       Assert.assertEquals(262, fieldTypes.size());
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("method"));
@@ -220,7 +208,6 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
     }
     // getColumnMetadata with only snort
     {
-      Assert.assertEquals(32, 
dao.getColumnMetadata(Collections.singletonList("snort")).size());
       Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
       Assert.assertEquals(32, fieldTypes.size());
       Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("sig_generator"));
@@ -242,7 +229,6 @@ public class ElasticsearchSearchIntegrationTest extends 
SearchIntegrationTest {
 
   @Override
   public void returns_column_data_for_multiple_indices() throws Exception {
-    Assert.assertEquals(277, dao.getColumnMetadata(Arrays.asList("bro", 
"snort")).size());
     Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
     Assert.assertEquals(277, fieldTypes.size());
 

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 6f36790..6489206 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -19,13 +19,6 @@ package org.apache.metron.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Iterables;
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.metron.common.utils.JSONUtils;
@@ -44,6 +37,14 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 
+import java.io.File;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
   private static final String SENSOR_NAME= "test";
   private static String indexDir = "target/elasticsearch_mutation";
@@ -55,6 +56,9 @@ public class ElasticsearchUpdateIntegrationTest extends 
UpdateIntegrationTest {
   private static final String CF = "p";
   private static MockHTable table;
   private static IndexDao hbaseDao;
+  private static IndexDao elasticsearchDao;
+  private static AccessConfig accessConfig;
+  private static Map<String, Object> globalConfig;
 
   @Override
   protected String getIndexName() {
@@ -62,30 +66,37 @@ public class ElasticsearchUpdateIntegrationTest extends 
UpdateIntegrationTest {
   }
 
   @BeforeClass
-  public static void setupBeforeClass() throws UnableToStartException {
-    es = new ElasticSearchComponent.Builder()
-        .withHttpPort(9211)
-        .withIndexDir(new File(indexDir))
-        .build();
-    es.start();
-  }
-
-  @Before
-  public void setup() throws IOException {
+  public static void setupBeforeClass() throws UnableToStartException, 
IOException {
     Configuration config = HBaseConfiguration.create();
     MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
     MockHBaseTableProvider.addToCache(TABLE_NAME, CF);
     table = (MockHTable) tableProvider.getTable(config, TABLE_NAME);
 
-    hbaseDao = new HBaseDao();
-    AccessConfig accessConfig = new AccessConfig();
-    accessConfig.setTableProvider(tableProvider);
-    Map<String, Object> globalConfig = createGlobalConfig();
+    globalConfig = new HashMap<>();
+    globalConfig.put("es.clustername", "metron");
+    globalConfig.put("es.port", "9200");
+    globalConfig.put("es.ip", "localhost");
+    globalConfig.put("es.date.format", dateFormat);
     globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
     globalConfig.put(HBaseDao.HBASE_CF, CF);
+
+    accessConfig = new AccessConfig();
+    accessConfig.setTableProvider(tableProvider);
     accessConfig.setGlobalConfigSupplier(() -> globalConfig);
 
-    MultiIndexDao dao = new MultiIndexDao(hbaseDao, createDao());
+    es = new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .withAccessConfig(accessConfig)
+            .build();
+    es.start();
+  }
+
+  @Before
+  public void setup() {
+    hbaseDao = new HBaseDao();
+    elasticsearchDao = new ElasticsearchDao();
+    MultiIndexDao dao = new MultiIndexDao(hbaseDao, elasticsearchDao);
     dao.init(accessConfig);
     setDao(dao);
   }
@@ -101,19 +112,6 @@ public class ElasticsearchUpdateIntegrationTest extends 
UpdateIntegrationTest {
     es.stop();
   }
 
-  protected static Map<String, Object> createGlobalConfig() {
-    return new HashMap<String, Object>() {{
-      put("es.clustername", "metron");
-      put("es.port", "9200");
-      put("es.ip", "localhost");
-      put("es.date.format", dateFormat);
-    }};
-  }
-
-  protected static IndexDao createDao() {
-    return new ElasticsearchDao();
-  }
-
   @Override
   protected void addTestData(String indexName, String sensorType,
       List<Map<String, Object>> docs) throws Exception {

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 3e14c00..227f5ef 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -24,15 +24,36 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+
+import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.io.FileUtils;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.client.ElasticsearchClient;
+import org.apache.metron.elasticsearch.client.ElasticsearchClientFactory;
+import org.apache.metron.elasticsearch.dao.ElasticsearchColumnMetadataDao;
+import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
+import org.apache.metron.elasticsearch.dao.ElasticsearchRequestSubmitter;
+import org.apache.metron.elasticsearch.dao.ElasticsearchRetrieveLatestDao;
+import org.apache.metron.elasticsearch.dao.ElasticsearchSearchDao;
+import org.apache.metron.elasticsearch.dao.ElasticsearchUpdateDao;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.UpdateDao;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
@@ -45,6 +66,7 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.support.WriteRequest;
 import org.elasticsearch.client.Client;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.common.settings.Settings;
@@ -54,6 +76,10 @@ import org.elasticsearch.node.NodeValidationException;
 import org.elasticsearch.plugins.Plugin;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.transport.Netty4Plugin;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
 
 public class ElasticSearchComponent implements InMemoryComponent {
 
@@ -75,6 +101,7 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
     private File indexDir;
     private Map<String, String> extraElasticSearchSettings = null;
     private List<Mapping> mappings = new ArrayList<>();
+    private AccessConfig accessConfig = new AccessConfig();
 
     public Builder withMapping(String index, String docType, String mapping) {
       mappings.add(new Mapping(index, docType, mapping));
@@ -97,8 +124,13 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
       return this;
     }
 
+    public Builder withAccessConfig(AccessConfig accessConfig) {
+      this.accessConfig = accessConfig;
+      return this;
+    }
+
     public ElasticSearchComponent build() {
-      return new ElasticSearchComponent(httpPort, indexDir, 
extraElasticSearchSettings, mappings);
+      return new ElasticSearchComponent(httpPort, indexDir, 
extraElasticSearchSettings, mappings, accessConfig);
     }
   }
 
@@ -109,13 +141,16 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
   private File indexDir;
   private Map<String, String> extraElasticSearchSettings;
   private List<Mapping> mappings;
+  private AccessConfig accessConfig;
+  private IndexDao indexDao;
 
   public ElasticSearchComponent(int httpPort, File indexDir,
-      Map<String, String> extraElasticSearchSettings, List<Mapping> mappings) {
+      Map<String, String> extraElasticSearchSettings, List<Mapping> mappings, 
AccessConfig accessConfig) {
     this.httpPort = httpPort;
     this.indexDir = indexDir;
     this.extraElasticSearchSettings = extraElasticSearchSettings;
     this.mappings = mappings;
+    this.accessConfig = accessConfig;
   }
 
   @Override
@@ -153,6 +188,10 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
           client.admin().indices().prepareCreate(m.index)
             .addMapping(m.docType, m.mapping).get();
     }
+
+    indexDao = new ElasticsearchDao()
+            .withRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
+    indexDao.init(accessConfig);
   }
 
   private void cleanDir(File dir) throws IOException {
@@ -194,35 +233,41 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
     return client;
   }
 
-  public BulkResponse add(String indexName, String sensorType, String... docs) 
throws IOException {
+  public void add(String indexName, String sensorType, String... docs)
+          throws IOException, ParseException {
     List<String> d = new ArrayList<>();
     Collections.addAll(d, docs);
-    return add(indexName, sensorType, d);
+    add(indexName, sensorType, d);
   }
 
-  public BulkResponse add(String indexName, String sensorType, 
Iterable<String> docs)
-      throws IOException {
-    BulkRequestBuilder bulkRequest = getClient().prepareBulk();
-    for (String doc : docs) {
-      IndexRequestBuilder indexRequestBuilder = getClient()
-          .prepareIndex(indexName, sensorType + "_doc");
-
-      indexRequestBuilder = indexRequestBuilder.setSource(doc);
-      Map<String, Object> esDoc = JSONUtils.INSTANCE
-          .load(doc, JSONUtils.MAP_SUPPLIER);
-      indexRequestBuilder.setId((String) esDoc.get(Constants.GUID));
-      Object ts = esDoc.get("timestamp");
-      if (ts != null) {
-        indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());
-      }
-      bulkRequest.add(indexRequestBuilder);
-    }
+  public void add(String indexName, String sensorType, Iterable<String> docs)
+          throws IOException, ParseException {
 
-    BulkResponse response = bulkRequest.execute().actionGet();
-    if (response.hasFailures()) {
-      throw new IOException(response.buildFailureMessage());
+    // create a collection of indexable documents
+    JSONParser parser = new JSONParser();
+    Map<Document, Optional<String>> documents = new HashMap<>();
+    for(String json: docs) {
+      JSONObject message = (JSONObject) parser.parse(json);
+      documents.put(createDocument(message, sensorType), 
Optional.of(indexName));
     }
-    return response;
+
+    // write the documents
+    indexDao.batchUpdate(documents);
+  }
+
+  /**
+   * Create an indexable Document from a JSON message.
+   *
+   * @param message The JSON message that needs indexed.
+   * @param docType The document type to write.
+   * @return The {@link Document} that was written.
+   * @throws IOException
+   */
+  private static Document createDocument(JSONObject message, String docType) 
throws IOException {
+    Long timestamp = ConversionUtils.convert(message.get("timestamp"), 
Long.class);
+    String source = message.toJSONString();
+    String guid = (String) message.get("guid");
+    return new Document(source, guid, docType, timestamp);
   }
 
   public void createIndexWithMapping(String indexName, String mappingType, 
String mappingSource)
@@ -246,7 +291,6 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
     getClient().admin().indices().refresh(new RefreshRequest());
     SearchResponse response = getClient().prepareSearch(index)
         .setTypes(sourceType)
-//                .setSource("message") ??
         .setFrom(0)
         .setSize(1000)
         .execute().actionGet();

http://git-wip-us.apache.org/repos/asf/metron/blob/89a2beda/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
index 24989b4..90bee80 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/metaalert/MetaAlertIntegrationTest.java
@@ -55,6 +55,7 @@ import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
+import org.json.simple.parser.ParseException;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -1090,7 +1091,7 @@ public abstract class MetaAlertIntegrationTest {
       throws IOException, InterruptedException;
 
   protected abstract void addRecords(List<Map<String, Object>> inputData, 
String index,
-      String docType) throws IOException;
+      String docType) throws IOException, ParseException;
 
   protected abstract long getMatchingMetaAlertCount(String fieldName, String 
fieldValue)
       throws IOException, InterruptedException;

Reply via email to