http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java index c61efc5..60d009f 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java @@ -27,6 +27,7 @@ import org.json.simple.parser.JSONParser; import org.json.simple.parser.ParseException; import java.io.*; +import java.util.Map; public enum JSONUtils { INSTANCE; @@ -37,6 +38,15 @@ public enum JSONUtils { private static ThreadLocal<ObjectMapper> _mapper = ThreadLocal.withInitial(() -> new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL)); + public <T> T convert(Object original, Class<T> targetClass) { + return _mapper.get().convertValue(original, targetClass); + } + + public ObjectMapper getMapper() { + return _mapper.get(); + } + + public <T> T load(InputStream is, TypeReference<T> ref) throws IOException { return _mapper.get().readValue(is, ref); }
http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-data-management/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/pom.xml b/metron-platform/metron-data-management/pom.xml index fbfd71a..90c2c52 100644 --- a/metron-platform/metron-data-management/pom.xml +++ b/metron-platform/metron-data-management/pom.xml @@ -115,6 +115,13 @@ <version>${project.parent.version}</version> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> <groupId>org.mitre.taxii</groupId> <artifactId>taxii</artifactId> <version>1.1.0.1</version> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java index 0223514..f94a02a 100644 --- a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java +++ b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java @@ -31,8 +31,9 @@ import org.apache.metron.dataloads.extractor.stix.StixExtractor; import org.apache.metron.enrichment.converter.EnrichmentConverter; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; -import org.apache.metron.test.mock.MockHTable; import org.apache.metron.enrichment.lookup.LookupKV; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.junit.*; import java.io.IOException; @@ -49,7 +50,7 @@ public class TaxiiIntegrationTest { @AfterClass public static void teardown() { MockTaxiiService.shutdown(); - MockHTable.Provider.clear(); + MockHBaseTableProvider.clear(); } /** @@ -91,7 +92,7 @@ public class TaxiiIntegrationTest { @Test public void testTaxii() throws Exception { - final MockHTable.Provider provider = new MockHTable.Provider(); + final MockHBaseTableProvider provider = new MockHBaseTableProvider(); final Configuration config = HBaseConfiguration.create(); TaxiiHandler handler = new TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), new StixExtractor(), config ) { @Override @@ -115,7 +116,7 @@ public class TaxiiIntegrationTest { } Assert.assertTrue(maliciousAddresses.contains("94.102.53.142")); Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, "AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size()); - MockHTable.Provider.clear(); + MockHBaseTableProvider.clear(); // Ensure that the handler can be run multiple times without connection issues. handler.run(); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/pom.xml b/metron-platform/metron-elasticsearch/pom.xml index 6cef1e9..40989c6 100644 --- a/metron-platform/metron-elasticsearch/pom.xml +++ b/metron-platform/metron-elasticsearch/pom.xml @@ -73,6 +73,23 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + <exclusions> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${global_hbase_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java index 64bf4b6..217da84 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java @@ -17,10 +17,24 @@ */ package org.apache.metron.elasticsearch.dao; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.writer.WriterConfiguration; +import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.indexing.dao.IndexDao; import org.apache.metron.indexing.dao.search.*; +import org.apache.metron.indexing.dao.search.SearchRequest; +import org.apache.metron.indexing.dao.search.SearchResponse; +import org.elasticsearch.action.get.GetRequestBuilder; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.search.*; +import org.elasticsearch.action.update.UpdateRequest; +import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.client.transport.TransportClient; @@ -36,6 +50,15 @@ import org.elasticsearch.search.aggregations.bucket.terms.StringTerms; import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.sort.*; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.SearchHit; +import org.elasticsearch.search.SearchHits; +import java.io.IOException; +import java.util.Arrays; +import java.util.Date; import java.io.IOException; import java.util.ArrayList; @@ -46,6 +69,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Function; import java.util.stream.Collectors; public class ElasticsearchDao implements IndexDao { @@ -120,6 +144,7 @@ public class ElasticsearchDao implements IndexDao { searchResult.setId(searchHit.getId()); searchResult.setSource(searchHit.getSource()); searchResult.setScore(searchHit.getScore()); + searchResult.setIndex(searchHit.getIndex()); return searchResult; }).collect(Collectors.toList())); if (facetFields.isPresent()) { @@ -135,9 +160,92 @@ public class ElasticsearchDao implements IndexDao { } @Override - public void init(Map<String, Object> globalConfig, AccessConfig config) { - this.client = ElasticsearchUtils.getClient(globalConfig, config.getOptionalSettings()); - this.accessConfig = config; + public synchronized void init(AccessConfig config) { + if(this.client == null) { + this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings()); + this.accessConfig = config; + } + } + + @Override + public Document getLatest(final String guid, final String sensorType) throws IOException { + Optional<Document> ret = searchByGuid( + guid + , sensorType + , hit -> { + Long ts = 0L; + String doc = hit.getSourceAsString(); + String sourceType = Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null); + try { + return Optional.of(new Document(doc, guid, sourceType, ts)); + } catch (IOException e) { + throw new IllegalStateException("Unable to retrieve latest: " + e.getMessage(), e); + } + } + ); + return ret.orElse(null); + } + + /** + * Return the search hit based on the UUID and sensor type. + * A callback can be specified to transform the hit into a type T. + * If more than one hit happens, the first one will be returned. + * @throws IOException + */ + <T> Optional<T> searchByGuid(String guid, String sensorType, Function<SearchHit, Optional<T>> callback) throws IOException{ + QueryBuilder query = QueryBuilders.matchQuery(Constants.GUID, guid); + SearchRequestBuilder request = client.prepareSearch() + .setTypes(sensorType + "_doc") + .setQuery(query) + .setSource("message") + ; + MultiSearchResponse response = client.prepareMultiSearch() + .add(request) + .get(); + for(MultiSearchResponse.Item i : response) { + org.elasticsearch.action.search.SearchResponse resp = i.getResponse(); + SearchHits hits = resp.getHits(); + for(SearchHit hit : hits) { + Optional<T> ret = callback.apply(hit); + if(ret.isPresent()) { + return ret; + } + } + } + return Optional.empty(); + + } + + @Override + public void update(Document update, Optional<String> index) throws IOException { + String indexPostfix = ElasticsearchUtils.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new Date()); + String sensorType = update.getSensorType(); + String indexName = ElasticsearchUtils.getIndexName(sensorType, indexPostfix, null); + + String type = sensorType + "_doc"; + Object ts = update.getTimestamp(); + IndexRequest indexRequest = new IndexRequest(indexName, type, update.getGuid()) + .source(update.getDocument()) + ; + if(ts != null) { + indexRequest = indexRequest.timestamp(ts.toString()); + } + String existingIndex = index.orElse( + searchByGuid(update.getGuid() + , sensorType + , hit -> Optional.ofNullable(hit.getIndex()) + ).orElse(indexName) + ); + UpdateRequest updateRequest = new UpdateRequest(existingIndex, type, update.getGuid()) + .doc(update.getDocument()) + .upsert(indexRequest) + ; + + try { + client.update(updateRequest).get(); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java index d199403..c7c4d90 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java @@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.utils; import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; +import org.apache.metron.common.configuration.writer.WriterConfiguration; import org.apache.metron.elasticsearch.writer.ElasticsearchWriter; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; @@ -27,14 +28,32 @@ import org.elasticsearch.common.transport.InetSocketTransportAddress; import java.net.InetAddress; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.text.SimpleDateFormat; +import java.util.*; public class ElasticsearchUtils { + private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE + = ThreadLocal.withInitial(() -> new HashMap<>()); + + public static SimpleDateFormat getIndexFormat(WriterConfiguration configurations) { + return getIndexFormat(configurations.getGlobalConfig()); + } + + public static SimpleDateFormat getIndexFormat(Map<String, Object> globalConfig) { + String format = (String) globalConfig.get("es.date.format"); + return DATE_FORMAT_CACHE.get().computeIfAbsent(format, SimpleDateFormat::new); + } + + public static String getIndexName(String sensorType, String indexPostfix, WriterConfiguration configurations) { + String indexName = sensorType; + if (configurations != null) { + indexName = configurations.getIndex(sensorType); + } + indexName = indexName + "_index_" + indexPostfix; + return indexName; + } + public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) { Settings.Builder settingsBuilder = Settings.settingsBuilder(); settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername")); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java index 39cdda9..dd32532 100644 --- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java +++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java @@ -17,6 +17,13 @@ */ package org.apache.metron.elasticsearch.writer; +import org.apache.metron.common.Constants; +import org.apache.metron.elasticsearch.utils.ElasticsearchUtils; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.tuple.Tuple; +import com.google.common.base.Splitter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import java.io.Serializable; import java.text.SimpleDateFormat; import java.util.Date; @@ -86,6 +93,11 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria sensorType + "_doc"); indexRequestBuilder = indexRequestBuilder.setSource(esDoc.toJSONString()); + String guid = (String)esDoc.get(Constants.GUID); + if(guid != null) { + indexRequestBuilder.setId(guid); + } + Object ts = esDoc.get("timestamp"); if(ts != null) { indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString()); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java deleted file mode 100644 index ffc41b3..0000000 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java +++ /dev/null @@ -1,147 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.metron.elasticsearch.integration; - - -import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.elasticsearch.dao.ElasticsearchDao; -import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; -import org.apache.metron.indexing.dao.AccessConfig; -import org.apache.metron.indexing.dao.IndexDao; -import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest; -import org.apache.metron.integration.InMemoryComponent; -import org.elasticsearch.action.bulk.BulkRequestBuilder; -import org.elasticsearch.action.bulk.BulkResponse; -import org.elasticsearch.action.index.IndexRequestBuilder; -import org.json.simple.JSONArray; -import org.json.simple.JSONObject; -import org.json.simple.parser.JSONParser; -import org.json.simple.parser.ParseException; -import org.junit.Before; -import org.junit.BeforeClass; - -import java.io.File; -import java.util.HashMap; - -public class ElasticsearchDaoIntegrationTest extends IndexingDaoIntegrationTest { - private static String indexDir = "target/elasticsearch_search"; - private static String dateFormat = "yyyy.MM.dd.HH"; - - /** - * { - * "bro_doc": { - * "properties": { - * "source:type": { "type": "string" }, - * "ip_src_addr": { "type": "ip" }, - * "ip_src_port": { "type": "integer" }, - * "long_field": { "type": "long" }, - * "timestamp" : { "type": "date" }, - * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, - * "is_alert": { "type": "boolean" }, - * "location_point": { "type": "geo_point" }, - * "bro_field": { "type": "string" }, - * "duplicate_name_field": { "type": "string" } - * } - * } - * } - */ - @Multiline - private static String broTypeMappings; - - /** - * { - * "snort_doc": { - * "properties": { - * "source:type": { "type": "string" }, - * "ip_src_addr": { "type": "ip" }, - * "ip_src_port": { "type": "integer" }, - * "long_field": { "type": "long" }, - * "timestamp" : { "type": "date" }, - * "latitude" : { "type": "float" }, - * "double_field": { "type": "double" }, - * "is_alert": { "type": "boolean" }, - * "location_point": { "type": "geo_point" }, - * "snort_field": { "type": "integer" }, - * "duplicate_name_field": { "type": "integer" } - * } - * } - * } - */ - @Multiline - private static String snortTypeMappings; - - - @Override - protected IndexDao createDao() throws Exception { - IndexDao ret = new ElasticsearchDao(); - ret.init( - new HashMap<String, Object>() {{ - put("es.clustername", "metron"); - put("es.port", "9300"); - put("es.ip", "localhost"); - put("es.date.format", dateFormat); - }}, - new AccessConfig() {{ - setMaxSearchResults(100); - }} - ); - return ret; - } - - @Override - protected InMemoryComponent startIndex() throws Exception { - InMemoryComponent es = new ElasticSearchComponent.Builder() - .withHttpPort(9211) - .withIndexDir(new File(indexDir)) - .build(); - es.start(); - return es; - } - - @Override - protected void loadTestData() throws ParseException { - ElasticSearchComponent es = (ElasticSearchComponent)indexComponent; - es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") - .addMapping("bro_doc", broTypeMappings).get(); - es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02") - .addMapping("snort_doc", snortTypeMappings).get(); - - BulkRequestBuilder bulkRequest = es.getClient().prepareBulk().setRefresh(true); - JSONArray broArray = (JSONArray) new JSONParser().parse(broData); - for(Object o: broArray) { - JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc"); - indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); - bulkRequest.add(indexRequestBuilder); - } - JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData); - for(Object o: snortArray) { - JSONObject jsonObject = (JSONObject) o; - IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc"); - indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); - indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); - bulkRequest.add(indexRequestBuilder); - } - BulkResponse bulkResponse = bulkRequest.execute().actionGet(); - if (bulkResponse.hasFailures()) { - throw new RuntimeException("Failed to index test data"); - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java new file mode 100644 index 0000000..d794ac9 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.integration; + + +import org.adrianwalker.multilinestring.Multiline; +import org.apache.metron.elasticsearch.dao.ElasticsearchDao; +import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.SearchIntegrationTest; +import org.apache.metron.integration.InMemoryComponent; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.json.simple.JSONArray; +import org.json.simple.JSONObject; +import org.json.simple.parser.JSONParser; +import org.json.simple.parser.ParseException; + +import java.io.File; +import java.util.HashMap; + +public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest { + private static String indexDir = "target/elasticsearch_search"; + private static String dateFormat = "yyyy.MM.dd.HH"; + + /** + * { + * "bro_doc": { + * "properties": { + * "source:type": { "type": "string" }, + * "ip_src_addr": { "type": "ip" }, + * "ip_src_port": { "type": "integer" }, + * "long_field": { "type": "long" }, + * "timestamp" : { "type": "date" }, + * "latitude" : { "type": "float" }, + * "double_field": { "type": "double" }, + * "is_alert": { "type": "boolean" }, + * "location_point": { "type": "geo_point" }, + * "bro_field": { "type": "string" }, + * "duplicate_name_field": { "type": "string" } + * } + * } + * } + */ + @Multiline + private static String broTypeMappings; + + /** + * { + * "snort_doc": { + * "properties": { + * "source:type": { "type": "string" }, + * "ip_src_addr": { "type": "ip" }, + * "ip_src_port": { "type": "integer" }, + * "long_field": { "type": "long" }, + * "timestamp" : { "type": "date" }, + * "latitude" : { "type": "float" }, + * "double_field": { "type": "double" }, + * "is_alert": { "type": "boolean" }, + * "location_point": { "type": "geo_point" }, + * "snort_field": { "type": "integer" }, + * "duplicate_name_field": { "type": "integer" } + * } + * } + * } + */ + @Multiline + private static String snortTypeMappings; + + + @Override + protected IndexDao createDao() throws Exception { + IndexDao ret = new ElasticsearchDao(); + ret.init( + new AccessConfig() {{ + setMaxSearchResults(100); + setGlobalConfigSupplier( () -> + new HashMap<String, Object>() {{ + put("es.clustername", "metron"); + put("es.port", "9300"); + put("es.ip", "localhost"); + put("es.date.format", dateFormat); + }} + ); + }} + ); + return ret; + } + + @Override + protected InMemoryComponent startIndex() throws Exception { + InMemoryComponent es = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(indexDir)) + .build(); + es.start(); + return es; + } + + @Override + protected void loadTestData() throws ParseException { + ElasticSearchComponent es = (ElasticSearchComponent)indexComponent; + es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01") + .addMapping("bro_doc", broTypeMappings).get(); + es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02") + .addMapping("snort_doc", snortTypeMappings).get(); + + BulkRequestBuilder bulkRequest = es.getClient().prepareBulk().setRefresh(true); + JSONArray broArray = (JSONArray) new JSONParser().parse(broData); + for(Object o: broArray) { + JSONObject jsonObject = (JSONObject) o; + IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc"); + indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); + indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); + bulkRequest.add(indexRequestBuilder); + } + JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData); + for(Object o: snortArray) { + JSONObject jsonObject = (JSONObject) o; + IndexRequestBuilder indexRequestBuilder = es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc"); + indexRequestBuilder = indexRequestBuilder.setSource(jsonObject.toJSONString()); + indexRequestBuilder = indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString()); + bulkRequest.add(indexRequestBuilder); + } + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + if (bulkResponse.hasFailures()) { + throw new RuntimeException("Failed to index test data"); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java new file mode 100644 index 0000000..9a1d7a7 --- /dev/null +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.elasticsearch.integration; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Iterables; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Result; +import org.apache.metron.common.Constants; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.elasticsearch.dao.ElasticsearchDao; +import org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.indexing.dao.*; +import org.apache.metron.indexing.dao.update.Document; +import org.apache.metron.indexing.dao.update.ReplaceRequest; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.*; + + +public class ElasticsearchUpdateIntegrationTest { + private static final int MAX_RETRIES = 10; + private static final int SLEEP_MS = 500; + private static final String SENSOR_NAME= "test"; + private static final String TABLE_NAME = "modifications"; + private static final String CF = "p"; + private static String indexDir = "target/elasticsearch_mutation"; + private static String dateFormat = "yyyy.MM.dd.HH"; + private static String index = SENSOR_NAME + "_index_" + new SimpleDateFormat(dateFormat).format(new Date()); + private static MockHTable table; + private static IndexDao esDao; + private static IndexDao hbaseDao; + private static MultiIndexDao dao; + private static ElasticSearchComponent es; + + @BeforeClass + public static void setup() throws Exception { + Configuration config = HBaseConfiguration.create(); + MockHBaseTableProvider tableProvider = new MockHBaseTableProvider(); + tableProvider.addToCache(TABLE_NAME, CF); + table = (MockHTable)tableProvider.getTable(config, TABLE_NAME); + // setup the client + es = new ElasticSearchComponent.Builder() + .withHttpPort(9211) + .withIndexDir(new File(indexDir)) + .build(); + es.start(); + + hbaseDao = new HBaseDao(); + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setTableProvider(tableProvider); + Map<String, Object> globalConfig = new HashMap<String, Object>() {{ + put("es.clustername", "metron"); + put("es.port", "9300"); + put("es.ip", "localhost"); + put("es.date.format", dateFormat); + put(HBaseDao.HBASE_TABLE, TABLE_NAME); + put(HBaseDao.HBASE_CF, CF); + }}; + accessConfig.setGlobalConfigSupplier(() -> globalConfig); + + esDao = new ElasticsearchDao(); + + dao = new MultiIndexDao(hbaseDao, esDao); + dao.init(accessConfig); + + } + + @AfterClass + public static void teardown() { + if(es != null) { + es.stop(); + } + } + + + + @Test + public void test() throws Exception { + List<Map<String, Object>> inputData = new ArrayList<>(); + for(int i = 0; i < 10;++i) { + final String name = "message" + i; + inputData.add( + new HashMap<String, Object>() {{ + put("source:type", SENSOR_NAME); + put("name" , name); + put("timestamp", System.currentTimeMillis()); + put(Constants.GUID, name); + }} + ); + } + es.add(index, SENSOR_NAME + , Iterables.transform(inputData, + m -> { + try { + return JSONUtils.INSTANCE.toJSON(m, true); + } catch (JsonProcessingException e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + ) + ); + List<Map<String,Object>> docs = null; + for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) { + docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); + if(docs.size() >= 10) { + break; + } + } + Assert.assertEquals(10, docs.size()); + //modify the first message and add a new field + { + Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ + put("new-field", "metron"); + }}; + String guid = "" + message0.get(Constants.GUID); + dao.replace(new ReplaceRequest(){{ + setReplacement(message0); + setGuid(guid); + setSensorType(SENSOR_NAME); + }}, Optional.empty()); + Assert.assertEquals(1, table.size()); + Document doc = dao.getLatest(guid, SENSOR_NAME); + Assert.assertEquals(message0, doc.getDocument()); + { + //ensure hbase is up to date + Get g = new Get(guid.getBytes()); + Result r = table.get(g); + NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); + Assert.assertEquals(1, columns.size()); + Assert.assertEquals(message0 + , JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) + , new TypeReference<Map<String, Object>>() {}) + ); + } + { + //ensure ES is up-to-date + long cnt = 0; + for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, Thread.sleep(SLEEP_MS)) { + docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); + cnt = docs + .stream() + .filter(d -> message0.get("new-field").equals(d.get("new-field"))) + .count(); + } + Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); + } + } + //modify the same message and modify the new field + { + Map<String, Object> message0 = new HashMap<String, Object>(inputData.get(0)) {{ + put("new-field", "metron2"); + }}; + String guid = "" + message0.get(Constants.GUID); + dao.replace(new ReplaceRequest(){{ + setReplacement(message0); + setGuid(guid); + setSensorType(SENSOR_NAME); + }}, Optional.empty()); + Assert.assertEquals(1, table.size()); + Document doc = dao.getLatest(guid, SENSOR_NAME); + Assert.assertEquals(message0, doc.getDocument()); + { + //ensure hbase is up to date + Get g = new Get(guid.getBytes()); + Result r = table.get(g); + NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes()); + Assert.assertEquals(2, columns.size()); + Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new String(columns.lastEntry().getValue()) + , new TypeReference<Map<String, Object>>() {}) + ); + Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new String(columns.firstEntry().getValue()) + , new TypeReference<Map<String, Object>>() {}) + ); + } + { + //ensure ES is up-to-date + long cnt = 0; + for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,Thread.sleep(SLEEP_MS)) { + docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc"); + cnt = docs + .stream() + .filter(d -> message0.get("new-field").equals(d.get("new-field"))) + .count(); + } + + Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0); + } + } + } + + + +} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java index 7766fe3..7facff5 100644 --- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java +++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java @@ -17,13 +17,18 @@ */ package org.apache.metron.elasticsearch.integration.components; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.io.FileUtils; +import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.elasticsearch.ElasticsearchTimeoutException; import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction; import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.bulk.BulkResponse; +import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.cluster.health.ClusterHealthStatus; import org.elasticsearch.action.admin.indices.refresh.RefreshRequest; import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest; @@ -39,6 +44,7 @@ import org.elasticsearch.search.SearchHit; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; @@ -90,6 +96,36 @@ public class ElasticSearchComponent implements InMemoryComponent { } dir.mkdirs(); } + + public BulkResponse add(String indexName, String sensorType, String... docs) throws IOException { + List<String> d = new ArrayList<>(); + Collections.addAll(d, docs); + return add(indexName, sensorType, d); + } + + public BulkResponse add(String indexName, String sensorType, Iterable<String> docs) throws IOException { + BulkRequestBuilder bulkRequest = getClient().prepareBulk(); + for(String doc : docs) { + IndexRequestBuilder indexRequestBuilder = getClient().prepareIndex(indexName, + sensorType + "_doc"); + + indexRequestBuilder = indexRequestBuilder.setSource(doc); + Map<String, Object> esDoc = JSONUtils.INSTANCE.load(doc, new TypeReference<Map<String, Object>>() { + }); + Object ts = esDoc.get("timestamp"); + if(ts != null) { + indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString()); + } + bulkRequest.add(indexRequestBuilder); + } + + BulkResponse response = bulkRequest.execute().actionGet(); + if(response.hasFailures()) { + throw new IOException(response.buildFailureMessage()); + } + return response; + } + @Override public void start() throws UnableToStartException { File logDir= new File(indexDir, "/logs"); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/pom.xml b/metron-platform/metron-enrichment/pom.xml index e2749c5..37cb49f 100644 --- a/metron-platform/metron-enrichment/pom.xml +++ b/metron-platform/metron-enrichment/pom.xml @@ -62,6 +62,13 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-hbase</artifactId> + <version>${project.parent.version}</version> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-profiler-client</artifactId> <version>${project.parent.version}</version> </dependency> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java index 35a90b7..974f8ab 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java @@ -26,7 +26,8 @@ import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.converter.EnrichmentHelper; -import org.apache.metron.test.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; @@ -102,8 +103,8 @@ public class SimpleHBaseAdapterTest { @Before public void setup() throws Exception { - final MockHTable trackerTable = (MockHTable) MockHTable.Provider.addToCache(atTableName, cf); - final MockHTable hbaseTable = (MockHTable) MockHTable.Provider.addToCache(hbaseTableName, cf); + final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(atTableName, cf); + final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf); EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") , new EnrichmentValue(PLAYFUL_ENRICHMENT) http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java index e96c7a7..e421edc 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java @@ -28,7 +28,8 @@ import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.converter.EnrichmentHelper; -import org.apache.metron.test.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker; import org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker; @@ -98,8 +99,8 @@ public class ThreatIntelAdapterTest { @Before public void setup() throws Exception { - final MockHTable trackerTable = (MockHTable) MockHTable.Provider.addToCache(atTableName, cf); - final MockHTable threatIntelTable = (MockHTable) MockHTable.Provider.addToCache(threatIntelTableName, cf); + final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(atTableName, cf); + final MockHTable threatIntelTable = (MockHTable) MockHBaseTableProvider.addToCache(threatIntelTableName, cf); EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ add(new LookupKV<>(new EnrichmentKey("10.0.2.3", "10.0.2.3"), new EnrichmentValue(new HashMap<>()))); }}); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java index a9a2fea..f77f16e 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java @@ -25,8 +25,6 @@ import com.google.common.base.Predicates; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.TestConstants; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; @@ -38,7 +36,8 @@ import org.apache.metron.enrichment.integration.components.ConfigUploadComponent import org.apache.metron.enrichment.lookup.LookupKV; import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator; import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions; -import org.apache.metron.hbase.TableProvider; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.integration.BaseIntegrationTest; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.ProcessorResult; @@ -48,7 +47,6 @@ import org.apache.metron.integration.components.ZKServerComponent; import org.apache.metron.integration.processors.KafkaMessageSet; import org.apache.metron.integration.processors.KafkaProcessor; import org.apache.metron.integration.utils.TestUtils; -import org.apache.metron.test.mock.MockHTable; import org.apache.metron.test.utils.UnitTestHelper; import org.json.simple.parser.ParseException; import org.junit.Assert; @@ -58,7 +56,6 @@ import org.junit.Test; import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -96,13 +93,6 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { private static File geoHdfsFile; - public static class Provider implements TableProvider, Serializable { - MockHTable.Provider provider = new MockHTable.Provider(); - @Override - public HTableInterface getTable(Configuration config, String tableName) throws IOException { - return provider.getTable(config, tableName); - } - } private static List<byte[]> getInputMessages(String path){ try{ @@ -138,7 +128,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { setProperty("threatintel_error_topic", ERROR_TOPIC); setProperty("enrichment_join_cache_size", "1000"); setProperty("threatintel_join_cache_size", "1000"); - setProperty("enrichment_hbase_provider_impl", "org.apache.metron.enrichment.integration.EnrichmentIntegrationTest\\$Provider"); + setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName()); setProperty("enrichment_table", enrichmentsTableName); setProperty("enrichment_cf", cf); setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," + @@ -171,7 +161,7 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { File globalConfig = new File(new File(TestConstants.SAMPLE_CONFIG_PATH), "global.json"); Map<String, Object> config = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() { }); - config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, Provider.class.getName()); + config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, MockHBaseTableProvider.class.getName()); config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM"); config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName); config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf); @@ -184,12 +174,12 @@ public class EnrichmentIntegrationTest extends BaseIntegrationTest { .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH); //create MockHBaseTables - final MockHTable trackerTable = (MockHTable) MockHTable.Provider.addToCache(trackerHBaseTableName, cf); - final MockHTable threatIntelTable = (MockHTable) MockHTable.Provider.addToCache(threatIntelTableName, cf); + final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(trackerHBaseTableName, cf); + final MockHTable threatIntelTable = (MockHTable) MockHBaseTableProvider.addToCache(threatIntelTableName, cf); EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>()))); }}); - final MockHTable enrichmentTable = (MockHTable) MockHTable.Provider.addToCache(enrichmentsTableName, cf); + final MockHTable enrichmentTable = (MockHTable) MockHBaseTableProvider.addToCache(enrichmentsTableName, cf); EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") , new EnrichmentValue(PLAYFUL_ENRICHMENT) http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java index 5901d9f..e4625ac 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java @@ -18,10 +18,12 @@ package org.apache.metron.enrichment.integration.components; import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.integration.InMemoryComponent; import org.apache.metron.integration.UnableToStartException; import org.apache.metron.integration.components.ZKServerComponent; +import org.apache.zookeeper.KeeperException; import java.util.HashMap; import java.util.Map; @@ -34,6 +36,7 @@ import static org.apache.metron.common.configuration.ConfigurationsUtils.*; public class ConfigUploadComponent implements InMemoryComponent { + private String connectionString; private Properties topologyProperties; private String globalConfigPath; private String parserConfigsPath; @@ -43,6 +46,12 @@ public class ConfigUploadComponent implements InMemoryComponent { private Optional<Consumer<ConfigUploadComponent>> postStartCallback = Optional.empty(); private Optional<String> globalConfig = Optional.empty(); private Map<String, SensorParserConfig> parserSensorConfigs = new HashMap<>(); + + public ConfigUploadComponent withConnectionString(String connectionString) { + this.connectionString = connectionString; + return this; + } + public ConfigUploadComponent withTopologyProperties(Properties topologyProperties) { this.topologyProperties = topologyProperties; return this; @@ -129,7 +138,7 @@ public class ConfigUploadComponent implements InMemoryComponent { public void update() throws UnableToStartException { try { - final String zookeeperUrl = topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY); + final String zookeeperUrl = connectionString == null?topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY):connectionString; if(globalConfigPath != null || parserConfigsPath != null @@ -157,6 +166,7 @@ public class ConfigUploadComponent implements InMemoryComponent { } + public SensorParserConfig getSensorParserConfig(String sensorType) { SensorParserConfig sensorParserConfig = new SensorParserConfig(); CuratorFramework client = getClient(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY)); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java deleted file mode 100644 index ac2904a..0000000 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.metron.enrichment.integration.mock; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.metron.hbase.TableProvider; -import org.apache.metron.test.mock.MockHTable; - -import java.io.IOException; -import java.io.Serializable; - -public class MockTableProvider implements TableProvider, Serializable { - static MockHTable.Provider provider = new MockHTable.Provider(); - @Override - public HTableInterface getTable(Configuration config, String tableName) throws IOException { - return provider.getTable(config, tableName); - } - public static void addTable(String tableName, String... cf) { - provider.addToCache(tableName, cf); - } - public static MockHTable getTable(String tableName) { - try { - return (MockHTable) provider.getTable(null, tableName); - } catch (IOException e) { - throw new RuntimeException("Unable to get table: " + tableName); - } - } -} http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java index d0d41f7..dbbc7d5 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java @@ -19,8 +19,8 @@ package org.apache.metron.enrichment.stellar; import com.google.common.collect.ImmutableMap; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.metron.hbase.mock.MockHTable; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.DefaultVariableResolver; import org.apache.metron.stellar.dsl.ParseException; @@ -31,13 +31,10 @@ import org.apache.metron.enrichment.converter.EnrichmentHelper; import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.converter.EnrichmentValue; import org.apache.metron.enrichment.lookup.LookupKV; -import org.apache.metron.hbase.TableProvider; -import org.apache.metron.test.mock.MockHTable; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -49,18 +46,12 @@ public class SimpleHBaseEnrichmentFunctionsTest { private String cf = "cf"; private static Context context; - public static class TP implements TableProvider { - @Override - public HTableInterface getTable(Configuration config, String tableName) throws IOException { - return MockHTable.Provider.getFromCache(tableName); - } - } @Before public void setup() throws Exception { - final MockHTable hbaseTable = (MockHTable) MockHTable.Provider.addToCache(hbaseTableName, cf); + final MockHTable hbaseTable = (MockHTable) MockHBaseTableProvider.addToCache(hbaseTableName, cf); EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ for(int i = 0;i < 5;++i) { add(new LookupKV<>(new EnrichmentKey(ENRICHMENT_TYPE, "indicator" + i) @@ -72,7 +63,7 @@ public class SimpleHBaseEnrichmentFunctionsTest { context = new Context.Builder() .with( Context.Capabilities.GLOBAL_CONFIG , () -> ImmutableMap.of( SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF - , TP.class.getName() + , MockHBaseTableProvider.class.getName() ) ) .build(); http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase-client/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase-client/pom.xml b/metron-platform/metron-hbase-client/pom.xml new file mode 100644 index 0000000..5dd6127 --- /dev/null +++ b/metron-platform/metron-hbase-client/pom.xml @@ -0,0 +1,100 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software + Foundation (ASF) under one or more contributor license agreements. See the + NOTICE file distributed with this work for additional information regarding + copyright ownership. The ASF licenses this file to You under the Apache License, + Version 2.0 (the "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed + under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES + OR CONDITIONS OF ANY KIND, either express or implied. See the License for + the specific language governing permissions and limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.metron</groupId> + <artifactId>metron-platform</artifactId> + <version>0.4.1</version> + </parent> + <!-- + The purpose of this project is to provide a version of the hbase client which has a shaded and relocated gauva. + This was motivated by the metron-rest project having transitive dependencies (swagger) which conflict strongly + with the transitive dependencies of hbase-client (specifically guava incompatibilities). + + The HBase team provides a shaded and relocated client artifact, BUT some of the *other* transitive dependencies + are not shaded and relocated properly. The purpose of this project is to add the transitive dependencies with + their properly relocated package prefixes. Currently we have not chosen to retrofit the rest of the project with + this dependency due to the significance of the change, but it may be useful to do so in the future. + --> + <artifactId>metron-hbase-client</artifactId> + <name>metron-hbase-client</name> + <url>https://metron.apache.org/</url> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <!-- Note: This is 1.1.2 and not ${global_hbase_version} to avoid https://issues.apache.org/jira/browse/HBASE-13889 --> + <!-- when we migrate to a version > 1.1.2, this can be set to ${global_hbase_version} --> + <shaded.client.version>1.1.2</shaded.client.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-shaded-client</artifactId> + <version>${shaded.client.version}</version> + </dependency> + </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${global_jar_version}</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>${global_shade_version}</version> + <configuration> + <createDependencyReducedPom>true</createDependencyReducedPom> + <artifactSet> + <excludes> + <exclude>*slf4j*</exclude> + </excludes> + </artifactSet> + </configuration> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <relocations> + <relocation> + <pattern>org.apache.commons.logging</pattern> + <shadedPattern>org.apache.hadoop.hbase.shaded.org.apache.commons.logging</shadedPattern> + </relocation> + <relocation> + <pattern>org.apache.log4j</pattern> + <shadedPattern>org.apache.hadoop.hbase.shaded.org.apache.log4j</shadedPattern> + </relocation> + </relocations> + </configuration> + </execution> + </executions> + </plugin> + + </plugins> + </build> +</project> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/pom.xml b/metron-platform/metron-hbase/pom.xml index cd5f641..180134f 100644 --- a/metron-platform/metron-hbase/pom.xml +++ b/metron-platform/metron-hbase/pom.xml @@ -232,4 +232,20 @@ <scope>provided</scope> </dependency> </dependencies> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>${global_jar_version}</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java index dc0569e..1804697 100644 --- a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java +++ b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java @@ -22,7 +22,16 @@ import org.apache.hadoop.hbase.client.HTableInterface; import java.io.IOException; import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.util.function.Supplier; public interface TableProvider extends Serializable { - HTableInterface getTable(Configuration config, String tableName) throws IOException; + HTableInterface getTable(Configuration config, String tableName) throws IOException; + static TableProvider create(String impl, Supplier<TableProvider> defaultSupplier) throws ClassNotFoundException, NoSuchMethodException, IllegalAccessException, InvocationTargetException, InstantiationException { + if(impl == null) { + return defaultSupplier.get(); + } + Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) Class.forName(impl); + return clazz.getConstructor().newInstance(); + } } http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java new file mode 100644 index 0000000..57981ac --- /dev/null +++ b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.hbase.mock; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.metron.hbase.TableProvider; + +import java.io.IOException; +import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; + +public class MockHBaseTableProvider implements Serializable, TableProvider { + private static Map<String, HTableInterface> _cache = new HashMap<>(); + public HTableInterface getTable(Configuration config, String tableName) throws IOException { + HTableInterface ret = _cache.get(tableName); + return ret; + } + + public static HTableInterface getFromCache(String tableName) { + return _cache.get(tableName); + } + + public static HTableInterface addToCache(String tableName, String... columnFamilies) { + MockHTable ret = new MockHTable(tableName, columnFamilies); + _cache.put(tableName, ret); + return ret; + } + + public static void clear() { + _cache.clear(); + } +}
