http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
 
b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
deleted file mode 100644
index df485f0..0000000
--- 
a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
+++ /dev/null
@@ -1,855 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.dataloads.bulk;
-
-import com.carrotsearch.hppc.ObjectObjectHashMap;
-import org.apache.commons.collections.IteratorUtils;
-import org.apache.metron.TestConstants;
-import org.apache.metron.common.configuration.Configuration;
-import org.easymock.EasyMock;
-import org.elasticsearch.action.*;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
-import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
-import 
org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
-import org.elasticsearch.action.admin.indices.alias.IndicesAliasesResponse;
-import 
org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.alias.exists.AliasesExistResponse;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
-import 
org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequestBuilder;
-import org.elasticsearch.action.admin.indices.alias.get.GetAliasesResponse;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequest;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeRequestBuilder;
-import org.elasticsearch.action.admin.indices.analyze.AnalyzeResponse;
-import 
org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequest;
-import 
org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.cache.clear.ClearIndicesCacheResponse;
-import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
-import org.elasticsearch.action.admin.indices.close.CloseIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
-import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
-import org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequest;
-import 
org.elasticsearch.action.admin.indices.exists.types.TypesExistsRequestBuilder;
-import org.elasticsearch.action.admin.indices.exists.types.TypesExistsResponse;
-import org.elasticsearch.action.admin.indices.flush.*;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
-import 
org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequestBuilder;
-import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
-import org.elasticsearch.action.admin.indices.get.GetIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.get.GetIndexResponse;
-import org.elasticsearch.action.admin.indices.mapping.get.*;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
-import 
org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder;
-import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
-import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
-import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder;
-import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
-import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
-import org.elasticsearch.action.admin.indices.refresh.RefreshRequestBuilder;
-import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
-import org.elasticsearch.action.admin.indices.segments.IndicesSegmentResponse;
-import org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequest;
-import 
org.elasticsearch.action.admin.indices.segments.IndicesSegmentsRequestBuilder;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequest;
-import 
org.elasticsearch.action.admin.indices.settings.get.GetSettingsRequestBuilder;
-import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
-import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
-import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsResponse;
-import 
org.elasticsearch.action.admin.indices.shards.IndicesShardStoreRequestBuilder;
-import org.elasticsearch.action.admin.indices.shards.IndicesShardStoresRequest;
-import 
org.elasticsearch.action.admin.indices.shards.IndicesShardStoresResponse;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
-import org.elasticsearch.action.admin.indices.stats.IndicesStatsResponse;
-import 
org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequest;
-import 
org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.template.delete.DeleteIndexTemplateResponse;
-import 
org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
-import 
org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
-import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
-import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateResponse;
-import org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequest;
-import 
org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.upgrade.get.UpgradeStatusResponse;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequest;
-import 
org.elasticsearch.action.admin.indices.upgrade.post.UpgradeRequestBuilder;
-import org.elasticsearch.action.admin.indices.upgrade.post.UpgradeResponse;
-import 
org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequest;
-import 
org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
-import 
org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerRequest;
-import 
org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerRequestBuilder;
-import 
org.elasticsearch.action.admin.indices.warmer.delete.DeleteWarmerResponse;
-import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersRequest;
-import 
org.elasticsearch.action.admin.indices.warmer.get.GetWarmersRequestBuilder;
-import org.elasticsearch.action.admin.indices.warmer.get.GetWarmersResponse;
-import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerRequest;
-import 
org.elasticsearch.action.admin.indices.warmer.put.PutWarmerRequestBuilder;
-import org.elasticsearch.action.admin.indices.warmer.put.PutWarmerResponse;
-import org.elasticsearch.client.AdminClient;
-import org.elasticsearch.client.Client;
-import org.elasticsearch.client.ClusterAdminClient;
-import org.elasticsearch.client.IndicesAdminClient;
-import org.elasticsearch.cluster.ClusterState;
-import org.elasticsearch.cluster.metadata.IndexMetaData;
-import org.elasticsearch.cluster.metadata.MetaData;
-import org.elasticsearch.common.Nullable;
-import org.elasticsearch.common.collect.ImmutableOpenMap;
-import org.elasticsearch.index.IndexNotFoundException;
-import org.elasticsearch.threadpool.ThreadPool;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.io.PrintStream;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.concurrent.TimeUnit;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-import static org.powermock.api.easymock.PowerMock.verifyAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(DeleteIndexResponse.class)
-public class ElasticsearchDataPrunerTest {
-
-    private Date testDate;
-    private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
-    private Configuration configuration;
-
-    private Client indexClient = mock(Client.class);
-    private AdminClient adminClient = mock(AdminClient.class);
-    private IndicesAdminClient indicesAdminClient = new 
TestIndicesAdminClient();
-    private DeleteIndexRequestBuilder deleteIndexRequestBuilder = 
mock(DeleteIndexRequestBuilder.class);
-    private DeleteIndexRequest deleteIndexRequest = 
mock(DeleteIndexRequest.class);
-    private ActionFuture<DeleteIndexResponse> deleteIndexAction = 
mock(ActionFuture.class);
-    private DeleteIndexResponse deleteIndexResponse = 
PowerMock.createMock(DeleteIndexResponse.class);
-
-
-    private ByteArrayOutputStream outContent;
-    private ByteArrayOutputStream errContent;
-
-    @Before
-    public void setUp() throws Exception {
-
-        Calendar calendar = Calendar.getInstance();
-        calendar.set(Calendar.MONTH, Calendar.MARCH);
-        calendar.set(Calendar.YEAR, 2016);
-        calendar.set(Calendar.DATE, 31);
-        calendar.set(Calendar.HOUR_OF_DAY, 0);
-        calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.SECOND, 0);
-        calendar.set(Calendar.MILLISECOND,0);
-        testDate = calendar.getTime();
-
-        when(indexClient.admin()).thenReturn(adminClient);
-        when(adminClient.indices()).thenReturn(indicesAdminClient);
-        
when(deleteIndexRequestBuilder.request()).thenReturn(deleteIndexRequest);
-        when(deleteIndexAction.actionGet()).thenReturn(deleteIndexResponse);
-
-        File resourceFile = new File(TestConstants.SAMPLE_CONFIG_PATH);
-        Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
-
-        configuration = new Configuration(resourcePath);
-
-        outContent = new ByteArrayOutputStream();
-        errContent = new ByteArrayOutputStream();
-
-        System.setOut(new PrintStream(outContent));
-        System.setErr(new PrintStream(errContent));
-
-    }
-
-    @Test(expected = IndexNotFoundException.class)
-    public void testWillThrowOnMissingIndex() throws Exception {
-
-        ((TestIndicesAdminClient)indicesAdminClient).throwMissingIndex = true;
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 
30, configuration, indexClient,"*");
-        pruner.deleteIndex(adminClient, "baz");
-        ((TestIndicesAdminClient)indicesAdminClient).throwMissingIndex = false;
-
-    }
-
-    @Test
-    public void testDeletesCorrectIndexes() throws Exception {
-
-        //Mock Cluster Admin
-        ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
-        ClusterStateRequestBuilder clusterStateRequestBuilder = 
mock(ClusterStateRequestBuilder.class);
-        ClusterStateResponse clusterStateResponse = 
mock(ClusterStateResponse.class);
-        ClusterState clusterState = mock(ClusterState.class);
-        ObjectObjectHashMap<String, IndexMetaData> clusterIndexes = new 
ObjectObjectHashMap();
-        MetaData clusterMetadata = mock(MetaData.class);
-        when(adminClient.cluster()).thenReturn(clusterAdminClient);
-        
when(clusterAdminClient.prepareState()).thenReturn(clusterStateRequestBuilder);
-        
when(clusterStateRequestBuilder.get()).thenReturn(clusterStateResponse);
-        when(clusterStateResponse.getState()).thenReturn(clusterState);
-        when(clusterState.getMetaData()).thenReturn(clusterMetadata);
-
-        int numDays = 5;
-
-        Date indexDate = new Date();
-
-        indexDate.setTime(testDate.getTime() - 
TimeUnit.DAYS.toMillis(numDays));
-
-        for (int i = 0; i < numDays * 24; i++) {
-
-            String indexName = "sensor_index_" + dateFormat.format(indexDate);
-            clusterIndexes.put(indexName, null);
-            indexDate.setTime(indexDate.getTime() + 
TimeUnit.HOURS.toMillis(1));
-
-        }
-
-        
when(clusterMetadata.getIndices()).thenReturn(ImmutableOpenMap.copyOf(clusterIndexes));
-
-
-        EasyMock.expect(deleteIndexResponse.isAcknowledged()).andReturn(true);
-
-        replayAll();
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 
1, configuration, indexClient, "sensor_index_");
-        pruner.indexClient = indexClient;
-        Long deleteCount = pruner.prune();
-        assertEquals("Should have pruned 24 indices", 24L, 
deleteCount.longValue());
-        verifyAll();
-
-    }
-
-    @Test
-    public void testFilter() throws Exception {
-
-        ObjectObjectHashMap<String, IndexMetaData> indexNames = new 
ObjectObjectHashMap();
-        SimpleDateFormat dateChecker = new SimpleDateFormat("yyyyMMdd");
-        int numDays = 5;
-        String[] expectedIndices = new String[24];
-        Date indexDate = new Date();
-
-        indexDate.setTime(testDate.getTime() - 
TimeUnit.DAYS.toMillis(numDays));
-
-        for (int i = 0, j=0; i < numDays * 24; i++) {
-
-            String indexName = "sensor_index_" + dateFormat.format(indexDate);
-            //Delete 20160330
-            if( dateChecker.format(indexDate).equals("20160330") ){
-                expectedIndices[j++] = indexName;
-            }
-
-            indexNames.put(indexName, null);
-            indexDate.setTime(indexDate.getTime() + 
TimeUnit.HOURS.toMillis(1));
-
-        }
-
-        ImmutableOpenMap<String, IndexMetaData> testIndices = 
ImmutableOpenMap.copyOf(indexNames);
-
-        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 
1, configuration,  indexClient, "sensor_index_");
-        pruner.indexClient = indexClient;
-
-        Iterable<String> filteredIndices = 
pruner.getFilteredIndices(testIndices);
-
-        Object[] indexArray = 
IteratorUtils.toArray(filteredIndices.iterator());
-        Arrays.sort(indexArray);
-        Arrays.sort(expectedIndices);
-
-        assertArrayEquals(expectedIndices,indexArray);
-
-    }
-
-    class TestIndicesAdminClient implements IndicesAdminClient {
-
-        public boolean throwMissingIndex = false;
-
-        @Override
-        public ActionFuture<DeleteIndexResponse> delete(DeleteIndexRequest 
request) {
-
-            if(throwMissingIndex){
-
-                throw new IndexNotFoundException("TEST EXCEPTION!");
-
-            }
-
-            return deleteIndexAction;
-
-        }
-
-
-        @Override
-        public ActionFuture<IndicesExistsResponse> exists(IndicesExistsRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void exists(IndicesExistsRequest request, 
ActionListener<IndicesExistsResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesExistsRequestBuilder prepareExists(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<TypesExistsResponse> 
typesExists(TypesExistsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void typesExists(TypesExistsRequest request, 
ActionListener<TypesExistsResponse> listener) {
-
-        }
-
-        @Override
-        public TypesExistsRequestBuilder prepareTypesExists(String... index) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesStatsResponse> stats(IndicesStatsRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void stats(IndicesStatsRequest request, 
ActionListener<IndicesStatsResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesStatsRequestBuilder prepareStats(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<RecoveryResponse> recoveries(RecoveryRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void recoveries(RecoveryRequest request, 
ActionListener<RecoveryResponse> listener) {
-
-        }
-
-        @Override
-        public RecoveryRequestBuilder prepareRecoveries(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesSegmentResponse> 
segments(IndicesSegmentsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void segments(IndicesSegmentsRequest request, 
ActionListener<IndicesSegmentResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesSegmentsRequestBuilder prepareSegments(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesShardStoresResponse> 
shardStores(IndicesShardStoresRequest request) {
-            return null;
-        }
-
-        @Override
-        public void shardStores(IndicesShardStoresRequest request, 
ActionListener<IndicesShardStoresResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesShardStoreRequestBuilder prepareShardStores(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<CreateIndexResponse> create(CreateIndexRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void create(CreateIndexRequest request, 
ActionListener<CreateIndexResponse> listener) {
-
-        }
-
-        @Override
-        public CreateIndexRequestBuilder prepareCreate(String index) {
-            return null;
-        }
-
-
-        @Override
-        public void delete(DeleteIndexRequest request, 
ActionListener<DeleteIndexResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteIndexRequestBuilder prepareDelete(String... indices) {
-            return deleteIndexRequestBuilder;
-        }
-
-        @Override
-        public ActionFuture<CloseIndexResponse> close(CloseIndexRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void close(CloseIndexRequest request, 
ActionListener<CloseIndexResponse> listener) {
-
-        }
-
-        @Override
-        public CloseIndexRequestBuilder prepareClose(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<OpenIndexResponse> open(OpenIndexRequest request) {
-            return null;
-        }
-
-        @Override
-        public void open(OpenIndexRequest request, 
ActionListener<OpenIndexResponse> listener) {
-
-        }
-
-        @Override
-        public OpenIndexRequestBuilder prepareOpen(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<RefreshResponse> refresh(RefreshRequest request) {
-            return null;
-        }
-
-        @Override
-        public void refresh(RefreshRequest request, 
ActionListener<RefreshResponse> listener) {
-
-        }
-
-        @Override
-        public RefreshRequestBuilder prepareRefresh(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<FlushResponse> flush(FlushRequest request) {
-            return null;
-        }
-
-        @Override
-        public void flush(FlushRequest request, ActionListener<FlushResponse> 
listener) {
-
-        }
-
-        @Override
-        public FlushRequestBuilder prepareFlush(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<SyncedFlushResponse> 
syncedFlush(SyncedFlushRequest request) {
-            return null;
-        }
-
-        @Override
-        public void syncedFlush(SyncedFlushRequest request, 
ActionListener<SyncedFlushResponse> listener) {
-
-        }
-
-        @Override
-        public SyncedFlushRequestBuilder prepareSyncedFlush(String... indices) 
{
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ForceMergeResponse> forceMerge(ForceMergeRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void forceMerge(ForceMergeRequest request, 
ActionListener<ForceMergeResponse> listener) {
-
-        }
-
-        @Override
-        public ForceMergeRequestBuilder prepareForceMerge(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpgradeResponse> upgrade(UpgradeRequest request) {
-            return null;
-        }
-
-        @Override
-        public void upgrade(UpgradeRequest request, 
ActionListener<UpgradeResponse> listener) {
-
-        }
-
-        @Override
-        public UpgradeStatusRequestBuilder prepareUpgradeStatus(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpgradeStatusResponse> 
upgradeStatus(UpgradeStatusRequest request) {
-            return null;
-        }
-
-        @Override
-        public void upgradeStatus(UpgradeStatusRequest request, 
ActionListener<UpgradeStatusResponse> listener) {
-
-        }
-
-        @Override
-        public UpgradeRequestBuilder prepareUpgrade(String... indices) {
-            return null;
-        }
-
-        @Override
-        public void getMappings(GetMappingsRequest request, 
ActionListener<GetMappingsResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetMappingsResponse> 
getMappings(GetMappingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public GetMappingsRequestBuilder prepareGetMappings(String... indices) 
{
-            return null;
-        }
-
-        @Override
-        public void getFieldMappings(GetFieldMappingsRequest request, 
ActionListener<GetFieldMappingsResponse> listener) {
-
-        }
-
-        @Override
-        public GetFieldMappingsRequestBuilder 
prepareGetFieldMappings(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetFieldMappingsResponse> 
getFieldMappings(GetFieldMappingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutMappingResponse> putMapping(PutMappingRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void putMapping(PutMappingRequest request, 
ActionListener<PutMappingResponse> listener) {
-
-        }
-
-        @Override
-        public PutMappingRequestBuilder preparePutMapping(String... indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<IndicesAliasesResponse> 
aliases(IndicesAliasesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void aliases(IndicesAliasesRequest request, 
ActionListener<IndicesAliasesResponse> listener) {
-
-        }
-
-        @Override
-        public IndicesAliasesRequestBuilder prepareAliases() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetAliasesResponse> getAliases(GetAliasesRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void getAliases(GetAliasesRequest request, 
ActionListener<GetAliasesResponse> listener) {
-
-        }
-
-        @Override
-        public GetAliasesRequestBuilder prepareGetAliases(String... aliases) {
-            return null;
-        }
-
-        @Override
-        public AliasesExistRequestBuilder prepareAliasesExist(String... 
aliases) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<AliasesExistResponse> 
aliasesExist(GetAliasesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void aliasesExist(GetAliasesRequest request, 
ActionListener<AliasesExistResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetIndexResponse> getIndex(GetIndexRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void getIndex(GetIndexRequest request, 
ActionListener<GetIndexResponse> listener) {
-
-        }
-
-        @Override
-        public GetIndexRequestBuilder prepareGetIndex() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ClearIndicesCacheResponse> 
clearCache(ClearIndicesCacheRequest request) {
-            return null;
-        }
-
-        @Override
-        public void clearCache(ClearIndicesCacheRequest request, 
ActionListener<ClearIndicesCacheResponse> listener) {
-
-        }
-
-        @Override
-        public ClearIndicesCacheRequestBuilder prepareClearCache(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<UpdateSettingsResponse> 
updateSettings(UpdateSettingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public void updateSettings(UpdateSettingsRequest request, 
ActionListener<UpdateSettingsResponse> listener) {
-
-        }
-
-        @Override
-        public UpdateSettingsRequestBuilder prepareUpdateSettings(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<AnalyzeResponse> analyze(AnalyzeRequest request) {
-            return null;
-        }
-
-        @Override
-        public void analyze(AnalyzeRequest request, 
ActionListener<AnalyzeResponse> listener) {
-
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze(@Nullable String index, 
String text) {
-            return null;
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze(String text) {
-            return null;
-        }
-
-        @Override
-        public AnalyzeRequestBuilder prepareAnalyze() {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutIndexTemplateResponse> 
putTemplate(PutIndexTemplateRequest request) {
-            return null;
-        }
-
-        @Override
-        public void putTemplate(PutIndexTemplateRequest request, 
ActionListener<PutIndexTemplateResponse> listener) {
-
-        }
-
-        @Override
-        public PutIndexTemplateRequestBuilder preparePutTemplate(String name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<DeleteIndexTemplateResponse> 
deleteTemplate(DeleteIndexTemplateRequest request) {
-            return null;
-        }
-
-        @Override
-        public void deleteTemplate(DeleteIndexTemplateRequest request, 
ActionListener<DeleteIndexTemplateResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteIndexTemplateRequestBuilder prepareDeleteTemplate(String 
name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<GetIndexTemplatesResponse> 
getTemplates(GetIndexTemplatesRequest request) {
-            return null;
-        }
-
-        @Override
-        public void getTemplates(GetIndexTemplatesRequest request, 
ActionListener<GetIndexTemplatesResponse> listener) {
-
-        }
-
-        @Override
-        public GetIndexTemplatesRequestBuilder prepareGetTemplates(String... 
name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<ValidateQueryResponse> 
validateQuery(ValidateQueryRequest request) {
-            return null;
-        }
-
-        @Override
-        public void validateQuery(ValidateQueryRequest request, 
ActionListener<ValidateQueryResponse> listener) {
-
-        }
-
-        @Override
-        public ValidateQueryRequestBuilder prepareValidateQuery(String... 
indices) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<PutWarmerResponse> putWarmer(PutWarmerRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public void putWarmer(PutWarmerRequest request, 
ActionListener<PutWarmerResponse> listener) {
-
-        }
-
-        @Override
-        public PutWarmerRequestBuilder preparePutWarmer(String name) {
-            return null;
-        }
-
-        @Override
-        public ActionFuture<DeleteWarmerResponse> 
deleteWarmer(DeleteWarmerRequest request) {
-            return null;
-        }
-
-        @Override
-        public void deleteWarmer(DeleteWarmerRequest request, 
ActionListener<DeleteWarmerResponse> listener) {
-
-        }
-
-        @Override
-        public DeleteWarmerRequestBuilder prepareDeleteWarmer() {
-            return null;
-        }
-
-        @Override
-        public void getWarmers(GetWarmersRequest request, 
ActionListener<GetWarmersResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetWarmersResponse> getWarmers(GetWarmersRequest 
request) {
-            return null;
-        }
-
-        @Override
-        public GetWarmersRequestBuilder prepareGetWarmers(String... indices) {
-            return null;
-        }
-
-        @Override
-        public void getSettings(GetSettingsRequest request, 
ActionListener<GetSettingsResponse> listener) {
-
-        }
-
-        @Override
-        public ActionFuture<GetSettingsResponse> 
getSettings(GetSettingsRequest request) {
-            return null;
-        }
-
-        @Override
-        public GetSettingsRequestBuilder prepareGetSettings(String... indices) 
{
-            return null;
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends 
ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, 
RequestBuilder>> ActionFuture<Response> execute(Action<Request, Response, 
RequestBuilder> action, Request request) {
-            return null;
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends 
ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, 
RequestBuilder>> void execute(Action<Request, Response, RequestBuilder> action, 
Request request, ActionListener<Response> listener) {
-
-        }
-
-        @Override
-        public <Request extends ActionRequest, Response extends 
ActionResponse, RequestBuilder extends ActionRequestBuilder<Request, Response, 
RequestBuilder>> RequestBuilder prepareExecute(Action<Request, Response, 
RequestBuilder> action) {
-            return null;
-        }
-
-        @Override
-        public ThreadPool threadPool() {
-            return null;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/README.md
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/README.md 
b/metron-platform/metron-elasticsearch/README.md
index 7278a25..1e15018 100644
--- a/metron-platform/metron-elasticsearch/README.md
+++ b/metron-platform/metron-elasticsearch/README.md
@@ -17,6 +17,15 @@ limitations under the License.
 -->
 # Elasticsearch in Metron
 
+## Table of Contents
+
+* [Introduction](#introduction)
+* [Properties](#properties)
+* [Upgrading to 5.6.2](#upgrading-to-562)
+* [Type Mappings](#type-mappings)
+* [Using Metron with Elasticsearch 5.6.2](#using-metron-with-elasticsearch-562)
+* [Installing Elasticsearch Templates](#installing-elasticsearch-templates)
+
 ## Introduction
 
 Elasticsearch can be used as the real-time portion of the datastore resulting 
from [metron-indexing](../metron-indexing/README.md).
@@ -50,9 +59,219 @@ For instance, an `es.date.format` of `yyyy.MM.dd.HH` would 
have the consequence
 roll hourly, whereas an `es.date.format` of `yyyy.MM.dd` would have the 
consequence that the indices would
 roll daily.
 
-## Using Metron with Elasticsearch 2.x
+## Upgrading to 5.6.2
+
+Users should be prepared to re-index when migrating from Elasticsearch 2.3.3 
to 5.6.2. There are a number of template changes, most notably around
+string type handling, that may cause issues when upgrading.
+
+[https://www.elastic.co/guide/en/elasticsearch/reference/5.6/setup-upgrade.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/setup-upgrade.html)
+
+Be aware that if you add a new string value and want to be able to filter and 
search on this value from the Alerts UI, you **must** add a mapping for that 
type to
+the appropriate Elasticsearch template. Below is more detail on how to choose 
the appropriate mapping type for your string value.
+
+## Type Mappings
+
+Type mappings have changed quite a bit from ES 2.x -> 5.x. Here is a brief 
rundown of the biggest changes. More detailed references from Elasticsearch
+are provided in the [Type Mapping References](#type-mapping-references) 
section below.
+* string fields replaced by text/keyword type
+* strings have new default mappings as follows
+
+    ```
+    {
+      "type": "text",
+      "fields": {
+        "keyword": {
+          "type": "keyword",
+          "ignore_above": 256
+        }
+      }
+    }
+    ```
+
+* There is no longer a `_timestamp` field that you can set "enabled" on. This 
field now causes an exception on templates.
+Replace with an application-created timestamp of "date" type.
+
+The semantics for string types have changed. In 2.x, you have the concept of 
index settings as either "analyzed" or "not_analyzed" which basically means 
"full text" and "keyword", respectively.
+Analyzed text basically means the indexer will split the text using a text 
analyzer thus allowing you to search on substrings within the original text. 
"New York" is split and indexed as two buckets,
+ "New" and "York", so you can search or query for aggregate counts for those 
terms independently and will match against the individual terms "New" or 
"York." "Keyword" means that the original text
+ will not be split/analyzed during indexing and instead treated as a whole 
unit, i.e. "New" or "York" will not match in searches against the document 
containing "New York", but searching on "New York"
+ as the full city name will. In 5.x language instead of using the "index" 
setting, you now set the "type" to either "text" for full text, or "keyword" 
for keywords.
+
+Below is a table depicting the changes to how String types are now handled.
+
+<table>
+<tr>
+       <th>sort, aggregate, or access values</th>
+       <th>ES 2.x</th>
+       <th>ES 5.x</th>
+       <th>Example</th>
+</tr>
+<tr>
+       <td>no</td>
+       <td>
+<pre><code>"my_property" : {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+       </td>
+       <td>
+<pre><code>"my_property" : {
+  "type": "text"
+}
+</code></pre>
+    Additional defaults: "index": "true", "fielddata": "false"
+       </td>
+       <td>
+               "New York" handled via in-mem search as "New" and "York" 
buckets. <strong>No</strong> aggregation or sort.
+       </td>
+</tr>
+<tr>
+       <td>
+       yes
+       </td>
+       <td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+       </td>
+       <td>
+<pre><code>"my_property": {
+  "type": "text",
+  "fielddata": "true"
+}
+</code></pre>
+       </td>
+       <td>
+       "New York" handled via in-mem search as "New" and "York" buckets. 
<strong>Can</strong> aggregate and sort.
+       </td>
+</tr>
+<tr>
+       <td>
+       yes
+       </td>
+       <td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "not_analyzed"
+}
+</code></pre>
+       </td>
+       <td>
+<pre><code>"my_property" : {
+  "type": "keyword"
+}
+</code></pre>
+       </td>
+       <td>
+       "New York" searchable as single value. <strong>Can</strong> aggregate 
and sort. A search for "New" or "York" will not match against the whole value.
+       </td>
+</tr>
+<tr>
+       <td>
+       yes
+       </td>
+       <td>
+<pre><code>"my_property": {
+  "type": "string",
+  "index": "analyzed"
+}
+</code></pre>
+       </td>
+       <td>
+<pre><code>"my_property": {
+  "type": "text",
+  "fields": {
+    "keyword": {
+      "type": "keyword",
+      "ignore_above": 256
+    }
+  }
+}
+</code></pre>
+       </td>
+       <td>
+       "New York" searchable as single value or as text document, can 
aggregate and sort on the sub term "keyword."
+       </td>
+</tr>
+</table>
 
-With Elasticsearch 2.x, there is a requirement that all sensors templates have 
a nested alert field defined.  This field is a dummy field, and will be 
obsolete in Elasticsearch 5.x.  See [Ignoring Unmapped 
Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields)
 for more information
+If you want to set default string behavior for all strings for a given index 
and type, you can do so with a mapping similar to the following (replace 
${your_type_here} accordingly):
+
+```
+# curl -XPUT 'http://${ES_HOST}:${ES_PORT}/_template/default_string_template' 
-d '
+{
+  "template": "*",
+  "mappings" : {
+    "${your_type_here}": {
+      "dynamic_templates": [
+        {
+          "strings": {
+            "match_mapping_type": "string",
+            "mapping": {
+              "type": "text"
+            }
+          }
+        }
+      ]
+    }
+  }
+}
+'
+```
+
+By specifying the "template" property with value "*" the template will apply 
to all indexes that have documents indexed of the specified type 
(${your_type_here}). This results in the following template.
+
+```
+# curl -XGET 
'http://${ES_HOST}:${ES_PORT}/_template/default_string_template?pretty'
+{
+  "default_string_template" : {
+    "order" : 0,
+    "template" : "*",
+    "settings" : { },
+    "mappings" : {
+      "${your_type_here}" : {
+        "dynamic_templates" : [
+          {
+            "strings" : {
+              "match_mapping_type" : "string",
+              "mapping" : {
+                "type" : "text"
+              }
+            }
+          }
+        ]
+      }
+    },
+    "aliases" : { }
+  }
+}
+```
+
+Notes on other settings for types in ES
+* doc_values
+    * on-disk data structure
+    * provides access for sorting, aggregation, and field values
+    * stores same values as _source, but in column-oriented fashion better for 
sorting and aggregating
+    * not supported on text fields
+    * enabled by default
+* fielddata
+    * in-memory data structure
+    * provides access for sorting, aggregation, and field values
+    * primarily for text fields
+    * disabled by default because the heap space required can be large
+
+
+##### Type Mapping References
+* 
[https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/mapping.html)
+* 
[https://www.elastic.co/guide/en/elasticsearch/reference/5.6/breaking_50_mapping_changes.html](https://www.elastic.co/guide/en/elasticsearch/reference/5.6/breaking_50_mapping_changes.html)
+* 
[https://www.elastic.co/blog/strings-are-dead-long-live-strings](https://www.elastic.co/blog/strings-are-dead-long-live-strings)
+
+## Using Metron with Elasticsearch 5.6.2
+
+There is a requirement that all sensors templates have a nested alert field 
defined.  This field is a dummy field.  See [Ignoring Unmapped 
Fields](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-request-sort.html#_ignoring_unmapped_fields)
 for more information
 
 Without this field, an error will be thrown during ALL searches (including 
from UIs, resulting in no alerts being found for any sensor). This error will 
be found in the REST service's logs.
 
@@ -63,7 +282,7 @@ QueryParsingException[[nested] failed to find nested object 
under path [alert]];
 
 There are two steps to resolve this issue.  First is to update the 
Elasticsearch template for each sensor, so any new indices have the field. This 
requires retrieving the template, removing an extraneous JSON field so we can 
put it back later, and adding our new field.
 
-Make sure to set the ELASTICSEARCH variable appropriately. $SENSOR can contain 
wildcards, so if rollover has occurred, it's not necessary to do each index 
individually. The example here appends `index*` to get all indexes for a the 
provided sensor.
+Make sure to set the ELASTICSEARCH variable appropriately. $SENSOR can contain 
wildcards, so if rollover has occurred, it's not necessary to do each index 
individually. The example here appends `index*` to get all indexes for the 
provided sensor.
 
 ```
 export ELASTICSEARCH="node1"
@@ -89,11 +308,11 @@ To update existing indexes, update Elasticsearch mappings 
with the new field for
 ```
 curl -XPUT 
"http://${ELASTICSEARCH}:9200/${SENSOR}_index*/_mapping/${SENSOR}_doc"; -d '
 {
-        "properties" : {
-          "alert" : {
-            "type" : "nested"
-          }
-        }
+  "properties" : {
+    "alert" : {
+      "type" : "nested"
+    }
+  }
 }
 '
 rm ${SENSOR}.template

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index d924891..97f4062 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -176,8 +176,14 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-core</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
             <version>${global_mockito_version}</version>
             <scope>test</scope>
         </dependency>
@@ -200,6 +206,16 @@
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <version>${global_log4j_core_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <version>${global_log4j_core_version}</version>
+        </dependency>
     </dependencies>
 
     <build>

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
new file mode 100644
index 0000000..0a04dfc
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExport.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.bulk;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.common.utils.JSONUtils;
+
+/**
+ * This is a utility for taking a file of JSON objects that were exported from 
ES and transforming
+ * it into a bulk import format. This was useful for backing up and restoring 
the Kibana dashboard
+ * index. The notable gap is that it expects one record per line in the file, 
which is not how
+ * ES generally returns results. Elasticsearch-dump was used as the 
intermediary to export data in
+ * the desired format for consumption by this tool.
+ * @see <a 
href="https://github.com/taskrabbit/elasticsearch-dump";>https://github.com/taskrabbit/elasticsearch-dump</a>
+ */
+public class ElasticsearchImportExport {
+
+  public static void main(String[] args) {
+    if (args.length != 2) {
+      throw new RuntimeException("Expects 'input' and 'output' file 
arguments.");
+    }
+    final String inPath = args[0];
+    final String outPath = args[1];
+    try {
+      new ElasticsearchImportExport().bulkify(Paths.get(inPath), 
Paths.get(outPath));
+    } catch (IOException e) {
+      e.printStackTrace();
+      System.exit(1);
+    }
+    System.exit(0);
+  }
+
+  /**
+   * Takes a file of line-delimited JSON objects and converts them to an 
Elasticsearch bulk import
+   * format.
+   *
+   * @param input input JSON file (note, each line is expected to be a 
separate complete JSON
+   * object, not the file as a whole.)
+   * @param output Elasticsearch bulk import file.
+   * @throws IOException
+   */
+  public void bulkify(Path input, Path output) throws IOException {
+    List<String> outRecs = new ArrayList<String>();
+    try (BufferedReader br = new BufferedReader(new 
FileReader(input.toFile()))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        Map<String, Object> inDoc = JSONUtils.INSTANCE
+            .load(line, new TypeReference<Map<String, Object>>() {
+            });
+        Object id = inDoc.get("_id");
+        Object type = inDoc.get("_type");
+        String createRaw = String
+            .format("{ \"create\" : { \"_id\": \"%s\", \"_type\": \"%s\" } }", 
id, type);
+        String outData = JSONUtils.INSTANCE.toJSON(inDoc.get("_source"), 
false);
+        outRecs.add(createRaw);
+        outRecs.add(outData);
+      }
+    }
+    try (BufferedWriter br = new BufferedWriter(new 
FileWriter(output.toFile()))) {
+      for (String line : outRecs) {
+        br.write(line);
+        br.write(System.lineSeparator());
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
index 9f21994..c12802e 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchColumnMetadataDao.java
@@ -48,7 +48,8 @@ public class ElasticsearchColumnMetadataDao implements 
ColumnMetadataDao {
   private static Map<String, FieldType> elasticsearchTypeMap;
   static {
     Map<String, FieldType> fieldTypeMap = new HashMap<>();
-    fieldTypeMap.put("string", FieldType.STRING);
+    fieldTypeMap.put("text", FieldType.TEXT);
+    fieldTypeMap.put("keyword", FieldType.KEYWORD);
     fieldTypeMap.put("ip", FieldType.IP);
     fieldTypeMap.put("integer", FieldType.INTEGER);
     fieldTypeMap.put("long", FieldType.LONG);

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 910c09b..650462e 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -37,27 +37,32 @@ import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
 import org.apache.metron.indexing.dao.update.Document;
-import org.elasticsearch.action.ActionWriteResponse.ShardInfo;
+import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.bulk.BulkRequestBuilder;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.elasticsearch.action.index.IndexRequest;
 import org.elasticsearch.action.index.IndexResponse;
 import org.elasticsearch.action.search.SearchRequestBuilder;
+import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
 import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.index.mapper.ip.IpFieldMapper;
+import org.elasticsearch.cluster.metadata.MappingMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.index.mapper.LegacyIpFieldMapper;
+import org.elasticsearch.index.query.IdsQueryBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
 import org.elasticsearch.search.aggregations.Aggregation;
+import org.elasticsearch.search.aggregations.AggregationBuilders;
 import org.elasticsearch.search.aggregations.Aggregations;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket;
 import org.elasticsearch.search.aggregations.bucket.terms.Terms.Order;
-import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
+import 
org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
 import org.elasticsearch.search.aggregations.metrics.sum.Sum;
-import org.elasticsearch.search.aggregations.metrics.sum.SumBuilder;
+import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
@@ -124,8 +129,27 @@ public class ElasticsearchDao implements IndexDao {
     //uninitialized.
   }
 
+  private static Map<String, FieldType> elasticsearchSearchTypeMap;
+
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("text", FieldType.TEXT);
+    fieldTypeMap.put("keyword", FieldType.KEYWORD);
+    fieldTypeMap.put("ip", FieldType.IP);
+    fieldTypeMap.put("integer", FieldType.INTEGER);
+    fieldTypeMap.put("long", FieldType.LONG);
+    fieldTypeMap.put("date", FieldType.DATE);
+    fieldTypeMap.put("float", FieldType.FLOAT);
+    fieldTypeMap.put("double", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    elasticsearchSearchTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    if(searchRequest.getQuery() == null) {
+      throw new InvalidSearchException("Search query is invalid: null");
+    }
     return search(searchRequest, new 
QueryStringQueryBuilder(searchRequest.getQuery()));
   }
 
@@ -162,14 +186,15 @@ public class ElasticsearchDao implements IndexDao {
   private org.elasticsearch.action.search.SearchRequest buildSearchRequest(
           SearchRequest searchRequest,
           QueryBuilder queryBuilder) throws InvalidSearchException {
-
-    LOG.debug("Got search request; request={}", 
ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got search request; request={}", 
ElasticsearchUtils.toJSON(searchRequest).orElse("???"));
+    }
     SearchSourceBuilder searchBuilder = new SearchSourceBuilder()
             .size(searchRequest.getSize())
             .from(searchRequest.getFrom())
             .query(queryBuilder)
             .trackScores(true);
-
+    Optional<List<String>> fields = searchRequest.getFields();
     // column metadata needed to understand the type of each sort field
     Map<String, FieldType> meta;
     try {
@@ -202,24 +227,30 @@ public class ElasticsearchDao implements IndexDao {
     }
 
     // handle search fields
-    if (searchRequest.getFields().isPresent()) {
-      searchBuilder.fields(searchRequest.getFields().get());
+    if (fields.isPresent()) {
+      searchBuilder.fetchSource("*", null);
     } else {
       searchBuilder.fetchSource(true);
     }
 
+    Optional<List<String>> facetFields = searchRequest.getFacetFields();
+
     // handle facet fields
     if (searchRequest.getFacetFields().isPresent()) {
+      // 
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/_bucket_aggregations.html
       for(String field : searchRequest.getFacetFields().get()) {
-        String name = getFacentAggregationName(field);
-        TermsBuilder terms = new TermsBuilder(name).field(field);
+        String name = getFacetAggregationName(field);
+        TermsAggregationBuilder terms = AggregationBuilders.terms( 
name).field(field);
+               // new TermsBuilder(name).field(field);
         searchBuilder.aggregation(terms);
       }
     }
 
     // return the search request
     String[] indices = wildcardIndices(searchRequest.getIndices());
-    LOG.debug("Built Elasticsearch request; indices={}, request={}", indices, 
searchBuilder.toString());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built Elasticsearch request; indices={}, request={}", 
indices, searchBuilder.toString());
+    }
     return new org.elasticsearch.action.search.SearchRequest()
             .indices(indices)
             .source(searchBuilder);
@@ -240,12 +271,13 @@ public class ElasticsearchDao implements IndexDao {
           org.elasticsearch.action.search.SearchResponse esResponse) throws 
InvalidSearchException {
 
     SearchResponse searchResponse = new SearchResponse();
+
     searchResponse.setTotal(esResponse.getHits().getTotalHits());
 
     // search hits --> search results
     List<SearchResult> results = new ArrayList<>();
     for(SearchHit hit: esResponse.getHits().getHits()) {
-      results.add(getSearchResult(hit, searchRequest.getFields().isPresent()));
+      results.add(getSearchResult(hit, searchRequest.getFields()));
     }
     searchResponse.setResults(results);
 
@@ -263,7 +295,9 @@ public class ElasticsearchDao implements IndexDao {
       searchResponse.setFacetCounts(getFacetCounts(facetFields, 
esResponse.getAggregations(), commonColumnMetadata ));
     }
 
-    LOG.debug("Built search response; response={}", 
ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Built search response; response={}", 
ElasticsearchUtils.toJSON(searchResponse).orElse("???"));
+    }
     return searchResponse;
   }
 
@@ -309,7 +343,7 @@ public class ElasticsearchDao implements IndexDao {
           QueryBuilder queryBuilder) {
 
     // handle groups
-    TermsBuilder groups = getGroupsTermBuilder(groupRequest, 0);
+    TermsAggregationBuilder groups = getGroupsTermBuilder(groupRequest, 0);
     final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
             .query(queryBuilder)
             .aggregation(groups);
@@ -446,16 +480,24 @@ public class ElasticsearchDao implements IndexDao {
    */
   <T> List<T> searchByGuids(Collection<String> guids, Collection<String> 
sensorTypes,
       Function<SearchHit, Optional<T>> callback) {
-    QueryBuilder query;
+    if(guids == null || guids.isEmpty()) {
+      return Collections.EMPTY_LIST;
+    }
+    QueryBuilder query = null;
+    IdsQueryBuilder idsQuery = null;
     if (sensorTypes != null) {
       String[] types = sensorTypes.stream().map(sensorType -> sensorType + 
"_doc").toArray(String[]::new);
-      query = QueryBuilders.idsQuery(types).ids(guids);
+      idsQuery = QueryBuilders.idsQuery(types);
     } else {
-      query = QueryBuilders.idsQuery().ids(guids);
+      idsQuery = QueryBuilders.idsQuery();
     }
+
+    for(String guid : guids) {
+        query = idsQuery.addIds(guid);
+    }
+
     SearchRequestBuilder request = client.prepareSearch()
                                          .setQuery(query)
-                                         .setSource("message")
                                          .setSize(guids.size())
                                          ;
     org.elasticsearch.action.search.SearchResponse response = request.get();
@@ -569,7 +611,7 @@ public class ElasticsearchDao implements IndexDao {
     Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
     for (String field: fields) {
       Map<String, Long> valueCounts = new HashMap<>();
-      Aggregation aggregation = 
aggregations.get(getFacentAggregationName(field));
+      Aggregation aggregation = 
aggregations.get(getFacetAggregationName(field));
       if (aggregation instanceof Terms) {
         Terms terms = (Terms) aggregation;
         terms.getBuckets().stream().forEach(bucket -> 
valueCounts.put(formatKey(bucket.getKey(), commonColumnMetadata.get(field)), 
bucket.getDocCount()));
@@ -580,8 +622,8 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   private String formatKey(Object key, FieldType type) {
-    if (FieldType.IP.equals(type)) {
-      return IpFieldMapper.longToIp((Long) key);
+    if (FieldType.IP.equals(type) && key instanceof Long) {
+      return LegacyIpFieldMapper.longToIp((Long) key);
     } else if (FieldType.BOOLEAN.equals(type)) {
       return (Long) key == 1 ? "true" : "false";
     } else {
@@ -589,11 +631,12 @@ public class ElasticsearchDao implements IndexDao {
     }
   }
 
-  private TermsBuilder getGroupsTermBuilder(GroupRequest groupRequest, int 
index) {
+  private TermsAggregationBuilder getGroupsTermBuilder(GroupRequest 
groupRequest, int index) {
     List<Group> groups = groupRequest.getGroups();
     Group group = groups.get(index);
     String aggregationName = getGroupByAggregationName(group.getField());
-    TermsBuilder termsBuilder = new TermsBuilder(aggregationName)
+    TermsAggregationBuilder termsBuilder = 
AggregationBuilders.terms(aggregationName);
+    termsBuilder
         .field(group.getField())
         .size(accessConfig.getMaxSearchGroups())
         .order(getElasticsearchGroupOrder(group.getOrder()));
@@ -602,7 +645,8 @@ public class ElasticsearchDao implements IndexDao {
     }
     Optional<String> scoreField = groupRequest.getScoreField();
     if (scoreField.isPresent()) {
-      termsBuilder.subAggregation(new 
SumBuilder(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0));
+      SumAggregationBuilder scoreSumAggregationBuilder = 
AggregationBuilders.sum(getSumAggregationName(scoreField.get())).field(scoreField.get()).missing(0);
+      termsBuilder.subAggregation(scoreSumAggregationBuilder);
     }
     return termsBuilder;
   }
@@ -630,14 +674,15 @@ public class ElasticsearchDao implements IndexDao {
     return searchResultGroups;
   }
 
-  private SearchResult getSearchResult(SearchHit searchHit, boolean 
fieldsPresent) {
+  private SearchResult getSearchResult(SearchHit searchHit, 
Optional<List<String>> fields) {
     SearchResult searchResult = new SearchResult();
     searchResult.setId(searchHit.getId());
     Map<String, Object> source;
-    if (fieldsPresent) {
+    if (fields.isPresent()) {
+      Map<String, Object> resultSourceAsMap = searchHit.getSourceAsMap();
       source = new HashMap<>();
-      searchHit.getFields().forEach((key, value) -> {
-        source.put(key, value.getValues().size() == 1 ? value.getValue() : 
value.getValues());
+      fields.get().forEach(field -> {
+        source.put(field, resultSourceAsMap.get(field));
       });
     } else {
       source = searchHit.getSource();
@@ -648,7 +693,7 @@ public class ElasticsearchDao implements IndexDao {
     return searchResult;
   }
 
-  private String getFacentAggregationName(String field) {
+  private String getFacetAggregationName(String field) {
     return String.format("%s_count", field);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
index f8fb145..9740272 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchMetaAlertDao.java
@@ -27,19 +27,11 @@ import static 
org.elasticsearch.index.query.QueryBuilders.termQuery;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
 import java.util.stream.Collectors;
+import org.apache.commons.collections4.SetUtils;
+import org.apache.lucene.search.join.ScoreMode;
 import org.apache.metron.common.Constants;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -59,6 +51,22 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.update.Document;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.get.MultiGetItemResponse;
+import org.elasticsearch.action.get.MultiGetRequest.Item;
+import org.elasticsearch.action.get.MultiGetRequestBuilder;
+import org.elasticsearch.action.get.MultiGetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.SearchRequestBuilder;
+import 
org.elasticsearch.action.support.replication.ReplicationResponse.ShardInfo;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
+import org.elasticsearch.common.xcontent.XContentBuilder;
+import org.elasticsearch.index.query.InnerHitBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.index.query.QueryStringQueryBuilder;
+import org.elasticsearch.search.SearchHit;
 import org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.stellar.common.utils.ConversionUtils;
@@ -66,7 +74,6 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
 import org.elasticsearch.index.query.QueryBuilder;
 import org.elasticsearch.index.query.QueryBuilders;
 import org.elasticsearch.index.query.QueryStringQueryBuilder;
-import org.elasticsearch.index.query.support.QueryInnerHitBuilder;
 
 public class ElasticsearchMetaAlertDao implements MetaAlertDao {
 
@@ -163,8 +170,9 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + GUID, guid))
-            ).innerHit(new QueryInnerHitBuilder())
+                    .must(termQuery(ALERT_FIELD + "." + GUID, guid)),
+                    ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, 
MetaAlertStatus.ACTIVE.getStatusString()));
     return queryAllResults(qb);
@@ -379,7 +387,8 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
             .should(new QueryStringQueryBuilder(searchRequest.getQuery()))
             .should(nestedQuery(
                 ALERT_FIELD,
-                new QueryStringQueryBuilder(searchRequest.getQuery())
+                new QueryStringQueryBuilder(searchRequest.getQuery()),
+                ScoreMode.None
                 )
             )
         )
@@ -486,8 +495,9 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
             nestedQuery(
                 ALERT_FIELD,
                 boolQuery()
-                    .must(termQuery(ALERT_FIELD + "." + GUID, alertGuid))
-            ).innerHit(new QueryInnerHitBuilder())
+                    .must(termQuery(ALERT_FIELD + "." + Constants.GUID, 
alertGuid)),
+                ScoreMode.None
+            ).innerHit(new InnerHitBuilder())
         )
         .must(termQuery(STATUS_FIELD, 
MetaAlertStatus.ACTIVE.getStatusString()));
     return queryAllResults(qb);
@@ -504,7 +514,7 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
     SearchRequestBuilder searchRequestBuilder = elasticsearchDao
         .getClient()
         .prepareSearch(index)
-        .addFields("*")
+        .addStoredField("*")
         .setFetchSource(true)
         .setQuery(qb)
         .setSize(pageSize);
@@ -586,6 +596,54 @@ public class ElasticsearchMetaAlertDao implements 
MetaAlertDao {
     } // else we have no updates, so don't do anything
   }
 
+
+
+  @SuppressWarnings("unchecked")
+  protected List<Map<String, Object>> getAllAlertsForMetaAlert(Document 
update) throws IOException {
+    Document latest = indexDao.getLatest(update.getGuid(), 
MetaAlertDao.METAALERT_TYPE);
+    if (latest == null) {
+      return new ArrayList<>();
+    }
+    List<String> guids = new ArrayList<>();
+    List<Map<String, Object>> latestAlerts = (List<Map<String, Object>>) 
latest.getDocument()
+        .get(MetaAlertDao.ALERT_FIELD);
+    for (Map<String, Object> alert : latestAlerts) {
+      guids.add((String) alert.get(Constants.GUID));
+    }
+
+    List<Map<String, Object>> alerts = new ArrayList<>();
+    QueryBuilder query = QueryBuilders.idsQuery().addIds(guids.toArray(new 
String[0]));
+    SearchRequestBuilder request = elasticsearchDao.getClient().prepareSearch()
+        .setQuery(query);
+    org.elasticsearch.action.search.SearchResponse response = request.get();
+    for (SearchHit hit : response.getHits().getHits()) {
+      alerts.add(hit.sourceAsMap());
+    }
+    return alerts;
+  }
+
+  /**
+   * Builds an update Document for updating the meta alerts list.
+   * @param alertGuid The GUID of the alert to update
+   * @param sensorType The sensor type to update
+   * @param metaAlertField The new metaAlertList to use
+   * @return The update Document
+   */
+  protected Document buildAlertUpdate(String alertGuid, String sensorType,
+      List<String> metaAlertField, Long timestamp) {
+    Document alertUpdate;
+    Map<String, Object> document = new HashMap<>();
+    document.put(MetaAlertDao.METAALERT_FIELD, metaAlertField);
+    alertUpdate = new Document(
+        document,
+        alertGuid,
+        sensorType,
+        timestamp
+    );
+    return alertUpdate;
+  }
+
+
   @Override
   public Map<String, FieldType> getColumnMetadata(List<String> indices)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index f29012a..4b73b84 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -17,19 +17,11 @@
  */
 package org.apache.metron.elasticsearch.utils;
 
+import static java.lang.String.format;
+
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
-import org.apache.commons.lang.StringUtils;
-import org.apache.metron.common.configuration.writer.WriterConfiguration;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.elasticsearch.client.transport.TransportClient;
-import org.elasticsearch.common.settings.Settings;
-import org.elasticsearch.common.transport.InetSocketTransportAddress;
-import org.elasticsearch.common.xcontent.XContentHelper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -41,8 +33,18 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-
-import static java.lang.String.format;
+import org.apache.commons.lang.StringUtils;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.netty.utils.NettyRuntimeWrapper;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.bytes.BytesReference;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.elasticsearch.common.xcontent.XContentHelper;
+import org.elasticsearch.transport.client.PreBuiltTransportClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ElasticsearchUtils {
 
@@ -106,7 +108,7 @@ public class ElasticsearchUtils {
   }
 
   public static TransportClient getClient(Map<String, Object> 
globalConfiguration, Map<String, String> optionalSettings) {
-    Settings.Builder settingsBuilder = Settings.settingsBuilder();
+    Settings.Builder settingsBuilder = Settings.builder();
     settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));
     settingsBuilder.put("client.transport.ping_timeout","500s");
     if (optionalSettings != null) {
@@ -115,7 +117,13 @@ public class ElasticsearchUtils {
     Settings settings = settingsBuilder.build();
     TransportClient client;
     try{
-      client = TransportClient.builder().settings(settings).build();
+      LOG.info("Number of available processors in Netty: {}", 
NettyRuntimeWrapper.availableProcessors());
+      // Netty sets available processors statically and if an attempt is made 
to set it more than
+      // once an IllegalStateException is thrown by 
NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
+      // 
https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
+      // 
https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
+      System.setProperty("es.set.netty.runtime.available.processors", "false");
+      client = new PreBuiltTransportClient(settings);
       for(HostnamePort hp : getIps(globalConfiguration)) {
         client.addTransportAddress(
                 new 
InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
@@ -196,9 +204,10 @@ public class ElasticsearchUtils {
   public static Optional<String> 
toJSON(org.elasticsearch.action.search.SearchRequest esRequest) {
     Optional<String> json = Optional.empty();
 
-    if(esRequest != null) {
+    if(esRequest != null && esRequest.source() != null) {
       try {
-        json = Optional.of(XContentHelper.convertToJson(esRequest.source(), 
true));
+        BytesReference requestBytes = esRequest.source().buildAsBytes();
+        json = Optional.of(XContentHelper.convertToJson(requestBytes, true));
 
       } catch (Throwable t) {
         LOG.error("Failed to convert search request to JSON", t);

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index bc9eccc..143bcf7 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,12 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import java.io.Serializable;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.common.interfaces.FieldNameConverter;
@@ -34,13 +40,6 @@ import org.json.simple.JSONObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, 
Serializable {
 
   private Map<String, String> optionalSettings;

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
new file mode 100644
index 0000000..ddec27c
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/bulk/ElasticsearchImportExportTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.elasticsearch.bulk;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.integration.utils.TestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class ElasticsearchImportExportTest {
+
+
+  /**
+   
*{"_index":".kibana","_type":"visualization","_id":"AV-Sj0e2hKs1cXXnFMqF","_score":1,"_source":{"title":"Welcome
 to Apache Metron"}}
+   
*{"_index":".kibana","_type":"blah","_id":"MIKE-AV-Sj0e2hKs1cXXnFMqF","_score":1,"_source":{"title":"another
 Welcome to Apache Metron"}}
+   */
+  @Multiline
+  private static String records;
+
+  /**
+   *{ "create" : { "_id": "AV-Sj0e2hKs1cXXnFMqF", "_type": "visualization" } }
+   *{"title":"Welcome to Apache Metron"}
+   *{ "create" : { "_id": "MIKE-AV-Sj0e2hKs1cXXnFMqF", "_type": "blah" } }
+   *{"title":"another Welcome to Apache Metron"}
+   */
+  @Multiline
+  private static String expected;
+  private File tempDir;
+
+  @Before
+  public void setup() throws Exception {
+    tempDir = TestUtils.createTempDir(this.getClass().getName());
+  }
+
+  @Test
+  public void 
bulk_exporter_writes_elasticsearch_records_in_bulk_import_format() throws 
Exception {
+    Path recordsFile = Paths.get(tempDir.getPath(), "inputfile.json");
+    Path outputFile = Paths.get(tempDir.getPath(), "outputfile.json");
+    TestUtils.write(recordsFile.toFile(), records);
+
+    ElasticsearchImportExport tool = new ElasticsearchImportExport();
+    tool.bulkify(recordsFile, outputFile);
+    String actual = TestUtils.read(outputFile.toFile());
+    assertThat(actual, equalTo(expected));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
index a6c0aa6..2a6fb4f 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchDaoTest.java
@@ -17,14 +17,23 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
-import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SortField;
 import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.metron.indexing.dao.search.FieldType;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchHit;
@@ -35,18 +44,10 @@ import org.json.simple.parser.JSONParser;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 public class ElasticsearchDaoTest {
 
@@ -103,7 +104,7 @@ public class ElasticsearchDaoTest {
 
     // setup the column metadata
     Map<String, FieldType> columnMetadata = new HashMap<>();
-    columnMetadata.put("sortByStringDesc", FieldType.STRING);
+    columnMetadata.put("sortByStringDesc", FieldType.TEXT);
     columnMetadata.put("sortByIntAsc", FieldType.INTEGER);
 
     // setup the dao
@@ -148,7 +149,7 @@ public class ElasticsearchDaoTest {
       JSONObject sortBy = (JSONObject) aSortField.get("sortByStringDesc");
       assertEquals("desc", sortBy.get("order"));
       assertEquals("_last", sortBy.get("missing"));
-      assertEquals("string", sortBy.get("unmapped_type"));
+      assertEquals("text", sortBy.get("unmapped_type"));
     }
     {
       // sort by integer ascending
@@ -217,7 +218,7 @@ public class ElasticsearchDaoTest {
 
     SearchRequest searchRequest = new SearchRequest();
     searchRequest.setSize(maxSearchResults+1);
-
+    searchRequest.setQuery("");
     dao.search(searchRequest);
     // exception expected - size > max
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/e8213918/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
index 26f5fff..07019c3 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/dao/ElasticsearchRequestSubmitterTest.java
@@ -24,6 +24,7 @@ import org.elasticsearch.action.search.SearchRequest;
 import org.elasticsearch.action.search.SearchResponse;
 import org.elasticsearch.action.search.ShardSearchFailure;
 import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.index.Index;
 import org.elasticsearch.rest.RestStatus;
 import org.elasticsearch.search.SearchShardTarget;
 import org.junit.Test;
@@ -56,7 +57,7 @@ public class ElasticsearchRequestSubmitterTest {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
 
     // response will have status of OK and no failed shards
     when(response.status()).thenReturn(RestStatus.OK);
@@ -74,7 +75,7 @@ public class ElasticsearchRequestSubmitterTest {
 
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
 
     // response will have status of OK
     when(response.status()).thenReturn(RestStatus.PARTIAL_CONTENT);
@@ -90,9 +91,9 @@ public class ElasticsearchRequestSubmitterTest {
   public void searchShouldHandleShardFailure() throws InvalidSearchException {
     // mocks
     SearchResponse response = mock(SearchResponse.class);
-    SearchRequest request = mock(SearchRequest.class);
+    SearchRequest request = new SearchRequest();
     ShardSearchFailure fail = mock(ShardSearchFailure.class);
-    SearchShardTarget target = mock(SearchShardTarget.class);
+    SearchShardTarget target = new SearchShardTarget("node1", 
mock(Index.class), 1, "metron");
 
     // response will have status of OK
     when(response.status()).thenReturn(RestStatus.OK);
@@ -107,7 +108,6 @@ public class ElasticsearchRequestSubmitterTest {
 
     // shard failure needs to report the node
     when(fail.shard()).thenReturn(target);
-    when(target.getNodeId()).thenReturn("node1");
 
     // shard failure needs to report details of failure
     when(fail.index()).thenReturn("bro_index_2017-10-11");

Reply via email to