http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java index 325d42e..dd29af3 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -19,203 +19,80 @@ package org.apache.metron.elasticsearch.integration; import com.fasterxml.jackson.core.JsonProcessingException; import com.google.common.collect.Iterables; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Result; -import org.apache.metron.common.Constants; +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.dao.ElasticsearchDao; import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; -import org.apache.metron.hbase.mock.MockHTable; -import org.apache.metron.hbase.mock.MockHBaseTableProvider; -import org.apache.metron.indexing.dao.*; -import org.apache.metron.indexing.dao.update.Document; -import org.apache.metron.indexing.dao.update.ReplaceRequest; -import org.elasticsearch.action.search.SearchResponse; -import org.elasticsearch.index.query.QueryBuilders; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.File; -import java.text.SimpleDateFormat; -import java.util.*; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.UpdateIntegrationTest; +import org.apache.metron.integration.InMemoryComponent; +public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest { -public class ElasticsearchUpdateIntegrationTest { - private static final int MAX_RETRIES = 10; - private static final int SLEEP_MS = 500; private static final String SENSOR_NAME= "test"; - private static final String TABLE_NAME = "modifications"; - private static final String CF = "p"; private static String indexDir = "target/elasticsearch_mutation"; private static String dateFormat = "yyyy.MM.dd.HH"; private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); - private static MockHTable table; - private static IndexDao esDao; - private static IndexDao hbaseDao; - private static MultiIndexDao dao; private static ElasticSearchComponent es; - @BeforeClass - public static void setup() throws Exception { - Configuration config = HBaseConfiguration.create(); - MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); - tableProvider.addToCache(TABLE_NAME, CF); - table = (MockHTable)tableProvider.getTable(config, TABLE_NAME); - // setup the client - es = new ElasticSearchComponent.Builder() - .withHttpPort(9211) - .withIndexDir(new File(indexDir)) - .build(); - es.start(); + @Override + protected String getIndexName() { + return SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); + } - hbaseDao = new HBaseDao(); - AccessConfig accessConfig = new AccessConfig(); - accessConfig.setTableProvider(tableProvider); - Map<String, Object> globalConfig = new HashMap<String, Object>() {{ + @Override + protected Map<String, Object> createGlobalConfig() throws Exception { + return new HashMap<String, Object>() {{ put("es.clustername", "metron"); put("es.port", "9300"); put("es.ip", "localhost"); put("es.date.format", dateFormat); - put(HBaseDao.HBASE_TABLE, TABLE_NAME); - put(HBaseDao.HBASE_CF, CF); }}; - accessConfig.setGlobalConfigSupplier(() -> globalConfig); - - esDao = new ElasticsearchDao(); - - dao = new MultiIndexDao(hbaseDao, esDao); - dao.init(accessConfig); + } + @Override + protected IndexDao createDao() throws Exception { + return new ElasticsearchDao(); } - @AfterClass - public static void teardown() { - if(es != null) { - es.stop(); - } + @Override + protected InMemoryComponent startIndex() throws Exception { + es = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(indexDir)) + .build(); + es.start(); + return es; } + @Override + protected void loadTestData() throws Exception { + } - @Test - public void test() throws Exception { - List<Map<String, Object>> inputData = new ArrayList<>(); - for(int i = 0; i < 10;++i) { - final String name = "message" + i; - inputData.add( - new HashMap<String, Object>() {{ - put("source:type", SENSOR_NAME); - put("name" , name); - put("timestamp", System.currentTimeMillis()); - put(Constants.GUID, name); - }} - ); - } + @Override + protected void addTestData(String indexName, String sensorType, + List<Map<String, Object>> docs) throws Exception { es.add(index, SENSOR_NAME - , Iterables.transform(inputData, - m -> { - try { - return JSONUtils.INSTANCE.toJSON(m, true); - } catch (JsonProcessingException e) { - throw new IllegalStateException(e.getMessage(), e); - } - } - ) + , Iterables.transform(docs, + m -> { + try { + return JSONUtils.INSTANCE.toJSON(m, true); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + ) ); - List<Map<String,Object>> docs = null; - for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) { - docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); - if(docs.size() >= 10) { - break; - } - } - Assert.assertEquals(10, docs.size()); - //modify the first message and add a new field - { - Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ - put("new-field", "metron"); - }}; - String guid = "" + message0.get(Constants.GUID); - dao.replace(new ReplaceRequest(){{ - setReplacement(message0); - setGuid(guid); - setSensorType(SENSOR_NAME); - }}, Optional.empty()); - - Assert.assertEquals(1, table.size()); - Document doc = dao.getLatest(guid, SENSOR_NAME); - Assert.assertEquals(message0, doc.getDocument()); - { - //ensure hbase is up to date - Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); - Result r = table.get(g); - NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); - Assert.assertEquals(1, columns.size()); - Assert.assertEquals(message0 - , JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) - , JSONUtils.MAP_SUPPLIER) - ); - } - { - //ensure ES is up-to-date - long cnt = 0; - for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { - docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); - cnt = docs - .stream() - .filter(d -> message0.get("new-field").equals(d.get("new-field"))) - .count(); - } - Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); - } - } - //modify the same message and modify the new field - { - Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ - put("new-field", "metron2"); - }}; - String guid = "" + message0.get(Constants.GUID); - dao.replace(new ReplaceRequest(){{ - setReplacement(message0); - setGuid(guid); - setSensorType(SENSOR_NAME); - }}, Optional.empty()); - Assert.assertEquals(1, table.size()); - Document doc = dao.getLatest(guid, SENSOR_NAME); - Assert.assertEquals(message0, doc.getDocument()); - { - //ensure hbase is up to date - Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); - Result r = table.get(g); - NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); - Assert.assertEquals(2, columns.size()); - Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) - , JSONUtils.MAP_SUPPLIER) - ); - Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new String(columns.firstEntry().getValue()) - , JSONUtils.MAP_SUPPLIER) - ); - } - { - //ensure ES is up-to-date - long cnt = 0; - for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS)) { - docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); - cnt = docs - .stream() - .filter(d -> message0.get("new-field").equals(d.get("new-field"))) - .count(); - } - - Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); - } - } } - - + @Override + protected List<Map<String, Object>> getIndexedTestData(String indexName, String sensorType) throws Exception { + return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); + } }
http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java new file mode 100644 index 0000000..3610574 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao; + +import org.apache.metron.indexing.dao.search.FieldType; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Responsible for retrieving column-level metadata about search indices. + */ +public interface ColumnMetadataDao { + + /** + * Retrieves column metadata for one or more search indices. + * @param indices The search indices to retrieve column metadata for. + * @return The column metadata, one set for each search index. + * @throws IOException + */ + Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException; +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java new file mode 100644 index 0000000..eee91ae --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.search; + +import java.io.IOException; +import java.util.List; +import org.apache.metron.indexing.dao.update.Document; + +public interface SearchDao { + + SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException; + + GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException; + + Document getLatest(String guid, String sensorType) throws IOException; + + Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java new file mode 100644 index 0000000..ca21b62 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.indexing.dao.update; + +import java.io.IOException; +import java.util.Map; +import java.util.Optional; + +public interface UpdateDao { + + void update(Document update, Optional<String> index) throws IOException; + + void batchUpdate(Map<Document, Optional<String>> updates) throws IOException; + +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index b40db46..b0d0b97 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -19,7 +19,6 @@ package org.apache.metron.indexing.dao; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -60,8 +59,8 @@ public abstract class SearchIntegrationTest { /** * [ - * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":"10"}, - * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":"20"}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, "is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, "duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":10.0}, + * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, "duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":20.0}, * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, "duplicate_name_field": 3, "guid":"snort_3"}, * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, "is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, "duplicate_name_field": 4, "guid":"snort_4"}, * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, "is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, "duplicate_name_field": 5, "guid":"snort_5"} @@ -274,6 +273,42 @@ public abstract class SearchIntegrationTest { /** * { + * "facetFields": ["snort_field"], + * "indices": ["bro", "snort"], + * "query": "*:*", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String missingTypeFacetQuery; + + /** + * { + * "facetFields": ["duplicate_name_field"], + * "indices": ["bro", "snort"], + * "query": "*:*", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String differentTypeFacetQuery; + + /** + * { * "indices": ["bro", "snort"], * "query": "*", * "from": 0, @@ -419,6 +454,23 @@ public abstract class SearchIntegrationTest { @Multiline public static String groupByIpQuery; + /** + * { + * "indices": ["bro", "snort"], + * "query": "duplicate_name_field:\"data 1\"", + * "from": 0, + * "size": 10, + * "sort": [ + * { + * "field": "timestamp", + * "sortOrder": "desc" + * } + * ] + * } + */ + @Multiline + public static String differentTypeFilterQuery; + protected static IndexDao dao; protected static InMemoryComponent indexComponent; @@ -443,11 +495,11 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(10, results.size()); for(int i = 0;i < 5;++i) { Assert.assertEquals("snort", results.get(i).getSource().get("source:type")); - Assert.assertEquals(10 - i, results.get(i).getSource().get("timestamp")); + Assert.assertEquals(10 - i + "", results.get(i).getSource().get("timestamp").toString()); } for (int i = 5; i < 10; ++i) { Assert.assertEquals("bro", results.get(i).getSource().get("source:type")); - Assert.assertEquals(10 - i, results.get(i).getSource().get("timestamp")); + Assert.assertEquals(10 - i + "", results.get(i).getSource().get("timestamp").toString()); } } @@ -458,7 +510,7 @@ public abstract class SearchIntegrationTest { Assert.assertTrue(response.isPresent()); Map<String, Object> doc = response.get(); Assert.assertEquals("bro", doc.get("source:type")); - Assert.assertEquals(3, doc.get("timestamp")); + Assert.assertEquals("3", doc.get("timestamp").toString()); } @Test @@ -483,11 +535,11 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(3, response.getTotal()); List<SearchResult> results = response.getResults(); Assert.assertEquals("snort", results.get(0).getSource().get("source:type")); - Assert.assertEquals(9, results.get(0).getSource().get("timestamp")); + Assert.assertEquals("9", results.get(0).getSource().get("timestamp").toString()); Assert.assertEquals("snort", results.get(1).getSource().get("source:type")); - Assert.assertEquals(7, results.get(1).getSource().get("timestamp")); + Assert.assertEquals("7", results.get(1).getSource().get("timestamp").toString()); Assert.assertEquals("bro", results.get(2).getSource().get("source:type")); - Assert.assertEquals(1, results.get(2).getSource().get("timestamp")); + Assert.assertEquals("1", results.get(2).getSource().get("timestamp").toString()); } @Test @@ -515,8 +567,8 @@ public abstract class SearchIntegrationTest { } // validate sorted order - there are only 2 with a 'threat:triage:score' - Assert.assertEquals("10", results.get(8).getSource().get("threat:triage:score")); - Assert.assertEquals("20", results.get(9).getSource().get("threat:triage:score")); + Assert.assertEquals("10.0", results.get(8).getSource().get("threat:triage:score").toString()); + Assert.assertEquals("20.0", results.get(9).getSource().get("threat:triage:score").toString()); } @Test @@ -528,8 +580,8 @@ public abstract class SearchIntegrationTest { Assert.assertEquals(10, results.size()); // validate sorted order - there are only 2 with a 'threat:triage:score' - Assert.assertEquals("20", results.get(0).getSource().get("threat:triage:score")); - Assert.assertEquals("10", results.get(1).getSource().get("threat:triage:score")); + Assert.assertEquals("20.0", results.get(0).getSource().get("threat:triage:score").toString()); + Assert.assertEquals("10.0", results.get(1).getSource().get("threat:triage:score").toString()); // the remaining are missing the 'threat:triage:score' and should be sorted last for (int i = 2; i < 10; i++) { @@ -545,11 +597,11 @@ public abstract class SearchIntegrationTest { List<SearchResult> results = response.getResults(); Assert.assertEquals(3, results.size()); Assert.assertEquals("snort", results.get(0).getSource().get("source:type")); - Assert.assertEquals(6, results.get(0).getSource().get("timestamp")); + Assert.assertEquals("6", results.get(0).getSource().get("timestamp").toString()); Assert.assertEquals("bro", results.get(1).getSource().get("source:type")); - Assert.assertEquals(5, results.get(1).getSource().get("timestamp")); + Assert.assertEquals("5", results.get(1).getSource().get("timestamp").toString()); Assert.assertEquals("bro", results.get(2).getSource().get("source:type")); - Assert.assertEquals(4, results.get(2).getSource().get("timestamp")); + Assert.assertEquals("4", results.get(2).getSource().get("timestamp").toString()); } @Test @@ -560,7 +612,7 @@ public abstract class SearchIntegrationTest { List<SearchResult> results = response.getResults(); for (int i = 5, j = 0; i > 0; i--, j++) { Assert.assertEquals("bro", results.get(j).getSource().get("source:type")); - Assert.assertEquals(i, results.get(j).getSource().get("timestamp")); + Assert.assertEquals(i + "", results.get(j).getSource().get("timestamp").toString()); } } @@ -640,14 +692,6 @@ public abstract class SearchIntegrationTest { } @Test - public void bad_facet_query_throws_exception() throws Exception { - thrown.expect(InvalidSearchException.class); - thrown.expectMessage("Failed to execute search"); - SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, SearchRequest.class); - dao.search(request); - } - - @Test public void disabled_facet_query_returns_null_count() throws Exception { SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, SearchRequest.class); SearchResponse response = dao.search(request); @@ -655,7 +699,33 @@ public abstract class SearchIntegrationTest { } @Test - public void exceeding_max_resulsts_throws_exception() throws Exception { + public void missing_type_facet_query() throws Exception { + SearchRequest request = JSONUtils.INSTANCE.load(missingTypeFacetQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertEquals(10, response.getTotal()); + + Map<String, Map<String, Long>> facetCounts = response.getFacetCounts(); + Assert.assertEquals(1, facetCounts.size()); + Map<String, Long> snortFieldCounts = facetCounts.get("snort_field"); + Assert.assertEquals(5, snortFieldCounts.size()); + Assert.assertEquals(1L, snortFieldCounts.get("50").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("40").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("30").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("20").longValue()); + Assert.assertEquals(1L, snortFieldCounts.get("10").longValue()); + response.getFacetCounts(); + } + + @Test + public void different_type_facet_query() throws Exception { + thrown.expect(Exception.class); + SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFacetQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertEquals(3, response.getTotal()); + } + + @Test + public void exceeding_max_results_throws_exception() throws Exception { thrown.expect(InvalidSearchException.class); thrown.expectMessage("Search result size must be less than 100"); SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, SearchRequest.class); @@ -663,68 +733,7 @@ public abstract class SearchIntegrationTest { } @Test - public void returns_column_data_for_multiple_indices() throws Exception { - Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); - Assert.assertEquals(15, fieldTypes.size()); - Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); - Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); - Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); - Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); - Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); - Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); - //NOTE: This is because the field is in both bro and snort and they have different types. - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field")); - Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); - } - - @Test - public void returns_column_metadata_for_specified_indices() throws Exception { - // getColumnMetadata with only bro - { - Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); - Assert.assertEquals(13, fieldTypes.size()); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); - Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); - Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); - Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); - Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); - Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); - Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); - } - // getColumnMetadata with only snort - { - Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); - Assert.assertEquals(14, fieldTypes.size()); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); - Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid")); - Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); - Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); - Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); - Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp")); - Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); - Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); - Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); - Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); - Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert")); - } + public void column_metadata_for_missing_index() throws Exception { // getColumnMetadata with an index that doesn't exist { Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("someindex")); @@ -732,6 +741,12 @@ public abstract class SearchIntegrationTest { } } + @Test + public void no_results_returned_when_query_does_not_match() throws Exception { + SearchRequest request = JSONUtils.INSTANCE.load(noResultsFieldsQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + Assert.assertEquals(0, response.getTotal()); + } @Test public void group_by_ip_query() throws Exception { @@ -757,13 +772,6 @@ public abstract class SearchIntegrationTest { } @Test - public void no_results_returned_when_query_does_not_match() throws Exception { - SearchRequest request = JSONUtils.INSTANCE.load(noResultsFieldsQuery, SearchRequest.class); - SearchResponse response = dao.search(request); - Assert.assertEquals(0, response.getTotal()); - } - - @Test public void group_by_returns_results_in_groups() throws Exception { // Group by test case, default order is count descending GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, GroupRequest.class); @@ -896,15 +904,6 @@ public abstract class SearchIntegrationTest { } @Test - public void throws_exception_on_aggregation_queries_on_non_string_non_numeric_fields() - throws Exception { - thrown.expect(InvalidSearchException.class); - thrown.expectMessage("Failed to execute search"); - GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, GroupRequest.class); - dao.group(request); - } - - @Test public void queries_fields() throws Exception { SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, SearchRequest.class); SearchResponse response = dao.search(request); @@ -940,6 +939,14 @@ public abstract class SearchIntegrationTest { indexComponent.stop(); } + @Test + public abstract void returns_column_data_for_multiple_indices() throws Exception; + @Test + public abstract void returns_column_metadata_for_specified_indices() throws Exception; + @Test + public abstract void different_type_filter_query() throws Exception; + + protected abstract IndexDao createDao() throws Exception; protected abstract InMemoryComponent startIndex() throws Exception; protected abstract void loadTestData() throws Exception; http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java new file mode 100644 index 0000000..369fa79 --- /dev/null +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java @@ -0,0 +1,199 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Optional; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.ReplaceRequest; +import org.apache.metron.integration.InMemoryComponent; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public abstract class UpdateIntegrationTest { + + private static final int MAX_RETRIES = 10; + private static final int SLEEP_MS = 500; + protected static final String SENSOR_NAME= "test"; + private static final String TABLE_NAME = "modifications"; + private static final String CF = "p"; + private static String index; + private static MockHTable table; + private static IndexDao hbaseDao; + + protected static MultiIndexDao dao; + protected static InMemoryComponent indexComponent; + + @Before + public void setup() throws Exception { + if(dao == null && indexComponent == null) { + index = getIndexName(); + indexComponent = startIndex(); + loadTestData(); + Configuration config = HBaseConfiguration.create(); + MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); + tableProvider.addToCache(TABLE_NAME, CF); + table = (MockHTable)tableProvider.getTable(config, TABLE_NAME); + + hbaseDao = new HBaseDao(); + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setTableProvider(tableProvider); + Map<String, Object> globalConfig = createGlobalConfig(); + globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME); + globalConfig.put(HBaseDao.HBASE_CF, CF); + accessConfig.setGlobalConfigSupplier(() -> globalConfig); + + dao = new MultiIndexDao(hbaseDao, createDao()); + dao.init(accessConfig); + } + } + + @Test + public void test() throws Exception { + List<Map<String, Object>> inputData = new ArrayList<>(); + for(int i = 0; i < 10;++i) { + final String name = "message" + i; + inputData.add( + new HashMap<String, Object>() {{ + put("source:type", SENSOR_NAME); + put("name" , name); + put("timestamp", System.currentTimeMillis()); + put(Constants.GUID, name); + }} + ); + } + addTestData(index, SENSOR_NAME, inputData); + List<Map<String,Object>> docs = null; + for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) { + docs = getIndexedTestData(index, SENSOR_NAME); + if(docs.size() >= 10) { + break; + } + } + Assert.assertEquals(10, docs.size()); + //modify the first message and add a new field + { + Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ + put("new-field", "metron"); + }}; + String guid = "" + message0.get(Constants.GUID); + dao.replace(new ReplaceRequest(){{ + setReplacement(message0); + setGuid(guid); + setSensorType(SENSOR_NAME); + setIndex(index); + }}, Optional.empty()); + + Assert.assertEquals(1, table.size()); + Document doc = dao.getLatest(guid, SENSOR_NAME); + Assert.assertEquals(message0, doc.getDocument()); + { + //ensure hbase is up to date + Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); + Result r = table.get(g); + NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); + Assert.assertEquals(1, columns.size()); + Assert.assertEquals(message0 + , JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) + , JSONUtils.MAP_SUPPLIER) + ); + } + { + //ensure ES is up-to-date + long cnt = 0; + for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { + docs = getIndexedTestData(index, SENSOR_NAME); + cnt = docs + .stream() + .filter(d -> message0.get("new-field").equals(d.get("new-field"))) + .count(); + } + Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); + } + } + //modify the same message and modify the new field + { + Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ + put("new-field", "metron2"); + }}; + String guid = "" + message0.get(Constants.GUID); + dao.replace(new ReplaceRequest(){{ + setReplacement(message0); + setGuid(guid); + setSensorType(SENSOR_NAME); + setIndex(index); + }}, Optional.empty()); + Assert.assertEquals(1, table.size()); + Document doc = dao.getLatest(guid, SENSOR_NAME); + Assert.assertEquals(message0, doc.getDocument()); + { + //ensure hbase is up to date + Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, SENSOR_NAME))); + Result r = table.get(g); + NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); + Assert.assertEquals(2, columns.size()); + Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) + , JSONUtils.MAP_SUPPLIER) + ); + Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new String(columns.firstEntry().getValue()) + , JSONUtils.MAP_SUPPLIER) + ); + } + { + //ensure ES is up-to-date + long cnt = 0; + for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS)) { + docs = getIndexedTestData(index, SENSOR_NAME); + cnt = docs + .stream() + .filter(d -> message0.get("new-field").equals(d.get("new-field"))) + .count(); + } + + Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); + } + } + } + + @AfterClass + public static void teardown() { + if(indexComponent != null) { + indexComponent.stop(); + } + } + + protected abstract String getIndexName(); + protected abstract Map<String, Object> createGlobalConfig() throws Exception; + protected abstract IndexDao createDao() throws Exception; + protected abstract InMemoryComponent startIndex() throws Exception; + protected abstract void loadTestData() throws Exception; + protected abstract void addTestData(String indexName, String sensorType, List<Map<String,Object>> docs) throws Exception; + protected abstract List<Map<String,Object>> getIndexedTestData(String indexName, String sensorType) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/pom.xml b/metron-platform/metron-solr/pom.xml index 9c9c7fb..9b2e806 100644 --- a/metron-platform/metron-solr/pom.xml +++ b/metron-platform/metron-solr/pom.xml @@ -63,6 +63,14 @@ <artifactId>fastutil</artifactId> <groupId>it.unimi.dsi</groupId> </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -192,11 +200,15 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> </exclusion> + <exclusion> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + </exclusion> </exclusions> </dependency> <dependency> <groupId>org.mockito</groupId> - <artifactId>mockito-all</artifactId> + <artifactId>mockito-core</artifactId> <version>${global_mockito_version}</version> <scope>test</scope> </dependency> @@ -231,6 +243,29 @@ </exclusion> </exclusions> </dependency> + <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-core</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java new file mode 100644 index 0000000..f645e93 --- /dev/null +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import com.google.common.collect.Sets; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.metron.indexing.dao.ColumnMetadataDao; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.schema.SchemaRequest; +import org.apache.solr.client.solrj.response.schema.SchemaRepresentation; +import org.apache.solr.common.SolrException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolrColumnMetadataDao implements ColumnMetadataDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private static Map<String, FieldType> solrTypeMap; + + static { + Map<String, FieldType> fieldTypeMap = new HashMap<>(); + fieldTypeMap.put("string", FieldType.TEXT); + fieldTypeMap.put("pint", FieldType.INTEGER); + fieldTypeMap.put("plong", FieldType.LONG); + fieldTypeMap.put("pfloat", FieldType.FLOAT); + fieldTypeMap.put("pdouble", FieldType.DOUBLE); + fieldTypeMap.put("boolean", FieldType.BOOLEAN); + solrTypeMap = Collections.unmodifiableMap(fieldTypeMap); + } + + private String zkHost; + + public SolrColumnMetadataDao(String zkHost) { + this.zkHost = zkHost; + } + + @Override + public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { + Map<String, FieldType> indexColumnMetadata = new HashMap<>(); + Map<String, String> previousIndices = new HashMap<>(); + Set<String> fieldBlackList = Sets.newHashSet(SolrDao.ROOT_FIELD, SolrDao.VERSION_FIELD); + + for (String index : indices) { + CloudSolrClient client = new CloudSolrClient.Builder().withZkHost(zkHost).build(); + client.setDefaultCollection(index); + try { + SchemaRepresentation schemaRepresentation = new SchemaRequest().process(client) + .getSchemaRepresentation(); + schemaRepresentation.getFields().stream().forEach(field -> { + String name = (String) field.get("name"); + if (!fieldBlackList.contains(name)) { + FieldType type = toFieldType((String) field.get("type")); + if (!indexColumnMetadata.containsKey(name)) { + indexColumnMetadata.put(name, type); + + // record the last index in which a field exists, to be able to print helpful error message on type mismatch + previousIndices.put(name, index); + } else { + FieldType previousType = indexColumnMetadata.get(name); + if (!type.equals(previousType)) { + String previousIndexName = previousIndices.get(name); + LOG.error(String.format( + "Field type mismatch: %s.%s has type %s while %s.%s has type %s. Defaulting type to %s.", + index, field, type.getFieldType(), + previousIndexName, field, previousType.getFieldType(), + FieldType.OTHER.getFieldType())); + indexColumnMetadata.put(name, FieldType.OTHER); + + // the field is defined in multiple indices with different types; ignore the field as type has been set to OTHER + fieldBlackList.add(name); + } + } + } + }); + } catch (SolrServerException e) { + throw new IOException(e); + } catch (SolrException e) { + // 400 means an index is missing so continue + if (e.code() != 400) { + throw new IOException(e); + } + } + } + return indexColumnMetadata; + } + + /** + * Converts a string type to the corresponding FieldType. + * + * @param type The type to convert. + * @return The corresponding FieldType or FieldType.OTHER, if no match. + */ + private FieldType toFieldType(String type) { + return solrTypeMap.getOrDefault(type, FieldType.OTHER); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java new file mode 100644 index 0000000..024755a --- /dev/null +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.ColumnMetadataDao; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolrDao implements IndexDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + public static final String ROOT_FIELD = "_root_"; + public static final String VERSION_FIELD = "_version_"; + + private transient SolrClient client; + private SolrSearchDao solrSearchDao; + private SolrUpdateDao solrUpdateDao; + private ColumnMetadataDao solrColumnMetadataDao; + + private AccessConfig accessConfig; + + protected SolrDao(SolrClient client, + AccessConfig config, + SolrSearchDao solrSearchDao, + SolrUpdateDao solrUpdateDao, + SolrColumnMetadataDao solrColumnMetadataDao) { + this.client = client; + this.accessConfig = config; + this.solrSearchDao = solrSearchDao; + this.solrUpdateDao = solrUpdateDao; + this.solrColumnMetadataDao = solrColumnMetadataDao; + } + + public SolrDao() { + //uninitialized. + } + + @Override + public void init(AccessConfig config) { + if (this.client == null) { + Map<String, Object> globalConfig = config.getGlobalConfigSupplier().get(); + String zkHost = (String) globalConfig.get("solr.zookeeper"); + this.client = new CloudSolrClient.Builder().withZkHost((String) globalConfig.get("solr.zookeeper")).build(); + this.accessConfig = config; + this.solrSearchDao = new SolrSearchDao(this.client, this.accessConfig); + this.solrUpdateDao = new SolrUpdateDao(this.client); + this.solrColumnMetadataDao = new SolrColumnMetadataDao(zkHost); + } + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + return this.solrSearchDao.search(searchRequest); + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + return this.solrSearchDao.group(groupRequest); + } + + @Override + public Document getLatest(String guid, String collection) throws IOException { + return this.solrSearchDao.getLatest(guid, collection); + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + return this.solrSearchDao.getAllLatest(getRequests); + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + this.solrUpdateDao.update(update, index); + } + + @Override + public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { + this.solrUpdateDao.batchUpdate(updates); + } + + @Override + public Map<String, FieldType> getColumnMetadata(List<String> indices) throws IOException { + return this.solrColumnMetadataDao.getColumnMetadata(indices); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java new file mode 100644 index 0000000..031d1d5 --- /dev/null +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.search.Group; +import org.apache.metron.indexing.dao.search.GroupOrder; +import org.apache.metron.indexing.dao.search.GroupOrderType; +import org.apache.metron.indexing.dao.search.GroupRequest; +import org.apache.metron.indexing.dao.search.GroupResponse; +import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchDao; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.indexing.dao.search.SearchResult; +import org.apache.metron.indexing.dao.search.SortField; +import org.apache.metron.indexing.dao.search.SortOrder; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrQuery; +import org.apache.solr.client.solrj.SolrQuery.ORDER; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.client.solrj.response.FacetField; +import org.apache.solr.client.solrj.response.FacetField.Count; +import org.apache.solr.client.solrj.response.PivotField; +import org.apache.solr.client.solrj.response.QueryResponse; +import org.apache.solr.common.SolrDocument; +import org.apache.solr.common.SolrDocumentList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolrSearchDao implements SearchDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private transient SolrClient client; + private AccessConfig accessConfig; + + public SolrSearchDao(SolrClient client, AccessConfig accessConfig) { + this.client = client; + this.accessConfig = accessConfig; + } + + @Override + public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { + if (searchRequest.getQuery() == null) { + throw new InvalidSearchException("Search query is invalid: null"); + } + if (client == null) { + throw new InvalidSearchException("Uninitialized Dao! You must call init() prior to use."); + } + if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) { + throw new InvalidSearchException( + "Search result size must be less than " + accessConfig.getMaxSearchResults()); + } + SolrQuery query = buildSearchRequest(searchRequest); + try { + QueryResponse response = client.query(query); + return buildSearchResponse(searchRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException { + String groupNames = groupRequest.getGroups().stream().map(Group::getField).collect( + Collectors.joining(",")); + SolrQuery query = new SolrQuery() + .setStart(0) + .setRows(0) + .setQuery(groupRequest.getQuery()); + query.set("collection", "bro,snort"); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + query.set("stats", true); + query.set("stats.field", String.format("{!tag=piv1 sum=true}%s", scoreField.get())); + } + query.set("facet", true); + query.set("facet.pivot", String.format("{!stats=piv1}%s", groupNames)); + try { + QueryResponse response = client.query(query); + return buildGroupResponse(groupRequest, response); + } catch (IOException | SolrServerException e) { + String msg = e.getMessage(); + LOG.error(msg, e); + throw new InvalidSearchException(msg, e); + } + } + + @Override + public Document getLatest(String guid, String collection) throws IOException { + try { + SolrDocument solrDocument = client.getById(collection, guid); + return toDocument(solrDocument); + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + Map<String, Collection<String>> collectionIdMap = new HashMap<>(); + for (GetRequest getRequest: getRequests) { + Collection<String> ids = collectionIdMap.getOrDefault(getRequest.getSensorType(), new HashSet<>()); + ids.add(getRequest.getGuid()); + collectionIdMap.put(getRequest.getSensorType(), ids); + } + try { + List<Document> documents = new ArrayList<>(); + for (String collection: collectionIdMap.keySet()) { + SolrDocumentList solrDocumentList = client.getById(collectionIdMap.get(collection), + new SolrQuery().set("collection", collection)); + documents.addAll(solrDocumentList.stream().map(this::toDocument).collect(Collectors.toList())); + } + return documents; + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + private SolrQuery buildSearchRequest( + SearchRequest searchRequest) { + SolrQuery query = new SolrQuery() + .setStart(searchRequest.getFrom()) + .setRows(searchRequest.getSize()) + .setQuery(searchRequest.getQuery()); + + // handle sort fields + for (SortField sortField : searchRequest.getSort()) { + query.addSort(sortField.getField(), getSolrSortOrder(sortField.getSortOrder())); + } + + // handle search fields + Optional<List<String>> fields = searchRequest.getFields(); + if (fields.isPresent()) { + fields.get().forEach(query::addField); + } + + //handle facet fields + Optional<List<String>> facetFields = searchRequest.getFacetFields(); + if (facetFields.isPresent()) { + facetFields.get().forEach(query::addFacetField); + } + + String collections = searchRequest.getIndices().stream().collect(Collectors.joining(",")); + query.set("collection", collections); + + return query; + } + + private SolrQuery.ORDER getSolrSortOrder( + SortOrder sortOrder) { + return sortOrder == SortOrder.DESC ? + ORDER.desc : ORDER.asc; + } + + private SearchResponse buildSearchResponse( + SearchRequest searchRequest, + QueryResponse solrResponse) { + + SearchResponse searchResponse = new SearchResponse(); + SolrDocumentList solrDocumentList = solrResponse.getResults(); + searchResponse.setTotal(solrDocumentList.getNumFound()); + + // search hits --> search results + List<SearchResult> results = solrDocumentList.stream() + .map(solrDocument -> getSearchResult(solrDocument, searchRequest.getFields())) + .collect(Collectors.toList()); + searchResponse.setResults(results); + + // handle facet fields + Optional<List<String>> facetFields = searchRequest.getFacetFields(); + if (facetFields.isPresent()) { + searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), solrResponse)); + } + + if (LOG.isDebugEnabled()) { + String response; + try { + response = JSONUtils.INSTANCE.toJSON(searchResponse, false); + } catch (JsonProcessingException e) { + response = e.getMessage(); + } + LOG.debug("Built search response; response={}", response); + } + return searchResponse; + } + + private SearchResult getSearchResult(SolrDocument solrDocument, Optional<List<String>> fields) { + SearchResult searchResult = new SearchResult(); + searchResult.setId((String) solrDocument.getFieldValue(Constants.GUID)); + Map<String, Object> source; + if (fields.isPresent()) { + source = new HashMap<>(); + fields.get().forEach(field -> source.put(field, solrDocument.getFieldValue(field))); + } else { + source = solrDocument.getFieldValueMap(); + } + searchResult.setSource(source); + return searchResult; + } + + private Map<String, Map<String, Long>> getFacetCounts(List<String> fields, + QueryResponse solrResponse) { + Map<String, Map<String, Long>> fieldCounts = new HashMap<>(); + for (String field : fields) { + Map<String, Long> valueCounts = new HashMap<>(); + FacetField facetField = solrResponse.getFacetField(field); + for (Count facetCount : facetField.getValues()) { + valueCounts.put(facetCount.getName(), facetCount.getCount()); + } + fieldCounts.put(field, valueCounts); + } + return fieldCounts; + } + + /** + * Build a group response. + * @param groupRequest The original group request. + * @param response The search response. + * @return A group response. + */ + private GroupResponse buildGroupResponse( + GroupRequest groupRequest, + QueryResponse response) { + String groupNames = groupRequest.getGroups().stream().map(Group::getField).collect( + Collectors.joining(",")); + List<PivotField> pivotFields = response.getFacetPivot().get(groupNames); + GroupResponse groupResponse = new GroupResponse(); + groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField()); + groupResponse.setGroupResults(getGroupResults(groupRequest, 0, pivotFields)); + return groupResponse; + } + + private List<GroupResult> getGroupResults(GroupRequest groupRequest, int index, List<PivotField> pivotFields) { + List<Group> groups = groupRequest.getGroups(); + List<GroupResult> searchResultGroups = new ArrayList<>(); + final GroupOrder groupOrder = groups.get(index).getOrder(); + pivotFields.sort((o1, o2) -> { + String s1 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ? + o1.getValue().toString() : Integer.toString(o1.getCount()); + String s2 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ? + o2.getValue().toString() : Integer.toString(o2.getCount()); + if (groupOrder.getSortOrder() == SortOrder.ASC) { + return s1.compareTo(s2); + } else { + return s2.compareTo(s1); + } + }); + + for(PivotField pivotField: pivotFields) { + GroupResult groupResult = new GroupResult(); + groupResult.setKey(pivotField.getValue().toString()); + groupResult.setTotal(pivotField.getCount()); + Optional<String> scoreField = groupRequest.getScoreField(); + if (scoreField.isPresent()) { + groupResult.setScore((Double) pivotField.getFieldStatsInfo().get("score").getSum()); + } + if (index < groups.size() - 1) { + groupResult.setGroupedBy(groups.get(index + 1).getField()); + groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, pivotField.getPivot())); + } + searchResultGroups.add(groupResult); + } + return searchResultGroups; + } + + private Document toDocument(SolrDocument solrDocument) { + Map<String, Object> document = new HashMap<>(); + solrDocument.getFieldNames().stream() + .filter(name -> !name.equals(SolrDao.VERSION_FIELD)) + .forEach(name -> document.put(name, solrDocument.getFieldValue(name))); + return new Document(document, + (String) solrDocument.getFieldValue(Constants.GUID), + (String) solrDocument.getFieldValue("source:type"), 0L); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java new file mode 100644 index 0000000..f25253d --- /dev/null +++ b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.dao; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.UpdateDao; +import org.apache.solr.client.solrj.SolrClient; +import org.apache.solr.client.solrj.SolrServerException; +import org.apache.solr.common.SolrInputDocument; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SolrUpdateDao implements UpdateDao { + + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private transient SolrClient client; + + public SolrUpdateDao(SolrClient client) { + this.client = client; + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + try { + SolrInputDocument solrInputDocument = toSolrInputDocument(update); + if (index.isPresent()) { + this.client.add(index.get(), solrInputDocument); + } else { + this.client.add(solrInputDocument); + } + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + @Override + public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { + // updates with a collection specified + Map<String, Collection<SolrInputDocument>> solrCollectionUpdates = new HashMap<>(); + + // updates with no collection specified + Collection<SolrInputDocument> solrUpdates = new ArrayList<>(); + + for(Entry<Document, Optional<String>> entry: updates.entrySet()) { + SolrInputDocument solrInputDocument = toSolrInputDocument(entry.getKey()); + Optional<String> index = entry.getValue(); + if (index.isPresent()) { + Collection<SolrInputDocument> solrInputDocuments = solrCollectionUpdates.get(index.get()); + if (solrInputDocuments == null) { + solrInputDocuments = new ArrayList<>(); + } + solrInputDocuments.add(solrInputDocument); + solrCollectionUpdates.put(index.get(), solrInputDocuments); + } else { + solrUpdates.add(solrInputDocument); + } + } + try { + if (!solrCollectionUpdates.isEmpty()) { + for(Entry<String, Collection<SolrInputDocument>> entry: solrCollectionUpdates.entrySet()) { + this.client.add(entry.getKey(), entry.getValue()); + } + } else { + this.client.add(solrUpdates); + } + } catch (SolrServerException e) { + throw new IOException(e); + } + } + + private SolrInputDocument toSolrInputDocument(Document document) { + SolrInputDocument solrInputDocument = new SolrInputDocument(); + document.getDocument().forEach(solrInputDocument::addField); + return solrInputDocument; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java index 7c907fd..09e88a4 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java @@ -18,6 +18,10 @@ package org.apache.metron.solr.integration; import com.google.common.base.Function; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import javax.annotation.Nullable; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.interfaces.FieldNameConverter; @@ -33,12 +37,6 @@ import org.apache.metron.integration.components.KafkaComponent; import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.solr.integration.components.SolrComponent; -import javax.annotation.Nullable; -import java.io.IOException; -import java.util.List; -import java.util.Map; -import java.util.Properties; - public class SolrIndexingIntegrationTest extends IndexingIntegrationTest { private String collection = "metron"; http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java new file mode 100644 index 0000000..a9ce650 --- /dev/null +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java @@ -0,0 +1,153 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.integration; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.SearchIntegrationTest; +import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.InvalidSearchException; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.apache.metron.integration.InMemoryComponent; +import org.apache.metron.solr.dao.SolrDao; +import org.apache.metron.solr.integration.components.SolrComponent; +import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.json.simple.JSONArray; +import org.json.simple.parser.JSONParser; +import org.junit.Assert; +import org.junit.Test; + +public class SolrSearchIntegrationTest extends SearchIntegrationTest { + + private SolrComponent solrComponent; + + @Override + protected IndexDao createDao() throws Exception { + AccessConfig config = new AccessConfig(); + config.setMaxSearchResults(100); + config.setMaxSearchGroups(100); + config.setGlobalConfigSupplier( () -> + new HashMap<String, Object>() {{ + put("solr.zookeeper", solrComponent.getZookeeperUrl()); + }} + ); + + IndexDao dao = new SolrDao(); + dao.init(config); + return dao; + } + + @Override + protected InMemoryComponent startIndex() throws Exception { + solrComponent = new SolrComponent.Builder() + .addCollection("bro", "../metron-solr/src/test/resources/config/bro/conf") + .addCollection("snort", "../metron-solr/src/test/resources/config/snort/conf") + .build(); + solrComponent.start(); + return solrComponent; + } + + @SuppressWarnings("unchecked") + @Override + protected void loadTestData() throws Exception { + CloudSolrClient solrClient = solrComponent.getSolrClient(); + JSONArray broArray = (JSONArray) new JSONParser().parse(broData); + solrComponent.addDocs("bro", broArray); + solrClient.setDefaultCollection("bro"); + solrClient.commit(); + JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData); + solrComponent.addDocs("snort", snortArray); + solrClient.setDefaultCollection("snort"); + solrClient.commit(); + } + + @Override + public void returns_column_metadata_for_specified_indices() throws Exception { + // getColumnMetadata with only bro + { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("bro")); + Assert.assertEquals(12, fieldTypes.size()); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("duplicate_name_field")); + } + // getColumnMetadata with only snort + { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Collections.singletonList("snort")); + Assert.assertEquals(13, fieldTypes.size()); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("duplicate_name_field")); + } + } + + @Override + public void returns_column_data_for_multiple_indices() throws Exception { + Map<String, FieldType> fieldTypes = dao.getColumnMetadata(Arrays.asList("bro", "snort")); + Assert.assertEquals(14, fieldTypes.size()); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field")); + Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude")); + Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score")); + Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert")); + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point")); + Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field")); + Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field")); + //NOTE: This is because the field is in both bro and snort and they have different types. + Assert.assertEquals(FieldType.OTHER, fieldTypes.get("duplicate_name_field")); + Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("threat:triage:score")); + } + + @Test + public void different_type_filter_query() throws Exception { + thrown.expect(InvalidSearchException.class); + SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, SearchRequest.class); + SearchResponse response = dao.search(request); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java new file mode 100644 index 0000000..faa4ec4 --- /dev/null +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.solr.integration; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.UpdateIntegrationTest; +import org.apache.metron.integration.InMemoryComponent; +import org.apache.metron.solr.dao.SolrDao; +import org.apache.metron.solr.integration.components.SolrComponent; + +public class SolrUpdateIntegrationTest extends UpdateIntegrationTest { + + SolrComponent solrComponent; + + @Override + protected String getIndexName() { + return SENSOR_NAME; + } + + @Override + protected Map<String, Object> createGlobalConfig() throws Exception { + return new HashMap<String, Object>() {{ + put("solr.zookeeper", solrComponent.getZookeeperUrl()); + }}; + } + + @Override + protected IndexDao createDao() throws Exception { + return new SolrDao(); + } + + @Override + protected InMemoryComponent startIndex() throws Exception { + solrComponent = new SolrComponent.Builder().addCollection(SENSOR_NAME, "../metron-solr/src/test/resources/config/test/conf").build(); + solrComponent.start(); + return solrComponent; + } + + @Override + protected void loadTestData() throws Exception { + + } + + @Override + protected void addTestData(String indexName, String sensorType, + List<Map<String, Object>> docs) throws Exception { + solrComponent.addDocs(indexName, docs); + } + + @Override + protected List<Map<String, Object>> getIndexedTestData(String indexName, String sensorType) + throws Exception { + return solrComponent.getAllIndexedDocs(indexName); + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java index 58976a3..85d14f4 100644 --- a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java +++ b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java @@ -18,6 +18,8 @@ package org.apache.metron.solr.integration.components; import com.google.common.base.Function; +import java.util.Collection; +import java.util.stream.Collectors; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.solr.writer.MetronSolrClient; @@ -25,6 +27,7 @@ import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.embedded.JettyConfig; import org.apache.solr.client.solrj.impl.CloudSolrClient; +import org.apache.solr.client.solrj.request.CollectionAdminRequest; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.cloud.MiniSolrCloudCluster; import org.apache.solr.common.SolrDocument; @@ -36,6 +39,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.solr.common.SolrInputDocument; public class SolrComponent implements InMemoryComponent { @@ -89,12 +93,12 @@ public class SolrComponent implements InMemoryComponent { try { File baseDir = Files.createTempDirectory("solrcomponent").toFile(); baseDir.deleteOnExit(); - miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir, new File(solrXmlPath), JettyConfig.builder().setPort(port).build()); + miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir.toPath(), JettyConfig.builder().setPort(port).build()); for(String name: collections.keySet()) { String configPath = collections.get(name); - miniSolrCloudCluster.uploadConfigDir(new File(configPath), name); + miniSolrCloudCluster.uploadConfigSet(new File(configPath).toPath(), name); + CollectionAdminRequest.createCollection(name, 1, 1).process(miniSolrCloudCluster.getSolrClient()); } - miniSolrCloudCluster.createCollection("metron", 1, 1, "metron", new HashMap<String, String>()); if (postStartCallback != null) postStartCallback.apply(this); } catch(Exception e) { throw new UnableToStartException(e.getMessage(), e); @@ -158,4 +162,16 @@ public class SolrComponent implements InMemoryComponent { } return docs; } + + public void addDocs(String collection, List<Map<String, Object>> docs) + throws IOException, SolrServerException { + CloudSolrClient solr = miniSolrCloudCluster.getSolrClient(); + solr.setDefaultCollection(collection); + Collection<SolrInputDocument> solrInputDocuments = docs.stream().map(doc -> { + SolrInputDocument solrInputDocument = new SolrInputDocument(); + doc.forEach(solrInputDocument::addField); + return solrInputDocument; + }).collect(Collectors.toList()); + solr.add(collection, solrInputDocuments); + } }
