Repository: metron Updated Branches: refs/heads/master c4c930f7c -> fd896fbeb
http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java index 1775018..3103ea7 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java @@ -18,13 +18,19 @@ package org.apache.metron.indexing.dao; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.NavigableMap; import java.util.Optional; + +import com.google.common.hash.Hasher; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTableInterface; @@ -32,7 +38,9 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.utils.KeyUtil; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; @@ -46,7 +54,7 @@ import org.apache.metron.indexing.dao.update.Document; * * Get document * * The mechanism here is that updates to documents will be added to a HBase Table as a write-ahead log. - * The Key for a row supporting a given document will be the GUID, which should be sufficiently distributed. + * The Key for a row supporting a given document will be the GUID plus the sensor type, which should be sufficiently distributed. * Every new update will have a column added (column qualifier will be the timestamp of the update). * Upon retrieval, the most recent column will be returned. * @@ -57,6 +65,72 @@ public class HBaseDao implements IndexDao { private HTableInterface tableInterface; private byte[] cf; private AccessConfig config; + + /** + * Implements the HBaseDao row key and exposes convenience methods for serializing/deserializing the row key. + * The row key is made of a GUID and sensor type along with a prefix to ensure data is distributed evenly. + */ + public static class Key { + private String guid; + private String sensorType; + public Key(String guid, String sensorType) { + this.guid = guid; + this.sensorType = sensorType; + } + + public String getGuid() { + return guid; + } + + public String getSensorType() { + return sensorType; + } + + public static Key fromBytes(byte[] buffer) throws IOException { + ByteArrayInputStream baos = new ByteArrayInputStream(buffer); + DataInputStream w = new DataInputStream(baos); + baos.skip(KeyUtil.HASH_PREFIX_SIZE); + return new Key(w.readUTF(), w.readUTF()); + } + + public byte[] toBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + if(getGuid() == null || getSensorType() == null) { + throw new IllegalStateException("Guid and sensor type must not be null: guid = " + getGuid() + ", sensorType = " + getSensorType()); + } + DataOutputStream w = new DataOutputStream(baos); + w.writeUTF(getGuid()); + w.writeUTF(getSensorType()); + w.flush(); + byte[] key = baos.toByteArray(); + byte[] prefix = KeyUtil.INSTANCE.getPrefix(key); + return KeyUtil.INSTANCE.merge(prefix, key); + } + + public static byte[] toBytes(Key k) throws IOException { + return k.toBytes(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + Key key = (Key) o; + + if (getGuid() != null ? !getGuid().equals(key.getGuid()) : key.getGuid() != null) return false; + return getSensorType() != null ? getSensorType().equals(key.getSensorType()) : key.getSensorType() == null; + + } + + @Override + public int hashCode() { + int result = getGuid() != null ? getGuid().hashCode() : 0; + result = 31 * result + (getSensorType() != null ? getSensorType().hashCode() : 0); + return result; + } + } + public HBaseDao() { } @@ -102,9 +176,32 @@ public class HBaseDao implements IndexDao { @Override public synchronized Document getLatest(String guid, String sensorType) throws IOException { - Get get = new Get(guid.getBytes()); + Key k = new Key(guid, sensorType); + Get get = new Get(Key.toBytes(k)); get.addFamily(cf); Result result = getTableInterface().get(get); + return getDocumentFromResult(result); + } + + @Override + public Iterable<Document> getAllLatest( + List<GetRequest> getRequests) throws IOException { + List<Get> gets = new ArrayList<>(); + for (GetRequest getRequest: getRequests) { + gets.add(buildGet(getRequest)); + } + Result[] results = getTableInterface().get(gets); + List<Document> allLatest = new ArrayList<>(); + for (Result result: results) { + Document d = getDocumentFromResult(result); + if (d != null) { + allLatest.add(d); + } + } + return allLatest; + } + + private Document getDocumentFromResult(Result result) throws IOException { NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf); if(columns == null || columns.size() == 0) { return null; @@ -112,8 +209,14 @@ public class HBaseDao implements IndexDao { Map.Entry<byte[], byte[]> entry= columns.lastEntry(); Long ts = Bytes.toLong(entry.getKey()); if(entry.getValue()!= null) { - String json = new String(entry.getValue()); - return new Document(json, guid, sensorType, ts); + Map<String, Object> json = JSONUtils.INSTANCE.load(new String(entry.getValue()), + new TypeReference<Map<String, Object>>() {}); + try { + Key k = Key.fromBytes(result.getRow()); + return new Document(json, k.getGuid(), k.getSensorType(), ts); + } catch (IOException e) { + throw new RuntimeException("Unable to convert row key to a document", e); + } } else { return null; @@ -126,8 +229,6 @@ public class HBaseDao implements IndexDao { getTableInterface().put(put); } - - @Override public void batchUpdate(Map<Document, Optional<String>> updates) throws IOException { List<Put> puts = new ArrayList<>(); @@ -140,8 +241,16 @@ public class HBaseDao implements IndexDao { getTableInterface().put(puts); } - protected Put buildPut(Document update) throws JsonProcessingException { - Put put = new Put(update.getGuid().getBytes()); + protected Get buildGet(GetRequest getRequest) throws IOException { + Key k = new Key(getRequest.getGuid(), getRequest.getSensorType()); + Get get = new Get(Key.toBytes(k)); + get.addFamily(cf); + return get; + } + + protected Put buildPut(Document update) throws IOException { + Key k = new Key(update.getGuid(), update.getSensorType()); + Put put = new Put(Key.toBytes(k)); long ts = update.getTimestamp() == null ? System.currentTimeMillis() : update.getTimestamp(); byte[] columnQualifier = Bytes.toBytes(ts); byte[] doc = JSONUtils.INSTANCE.toJSONPretty(update.getDocument()); @@ -149,6 +258,7 @@ public class HBaseDao implements IndexDao { return put; } + @Override public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> indices) throws IOException { return null; http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java index 4b7829e..8855a14 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java @@ -36,6 +36,10 @@ import org.apache.metron.indexing.dao.update.OriginalNotFoundException; import org.apache.metron.indexing.dao.update.PatchRequest; import org.apache.metron.indexing.dao.update.ReplaceRequest; +/** + * The IndexDao provides a common interface for retrieving and storing data in a variety of persistent stores. + * Document reads and writes require a GUID and sensor type with an index being optional. + */ public interface IndexDao { /** @@ -66,6 +70,15 @@ public interface IndexDao { Document getLatest(String guid, String sensorType) throws IOException; /** + * Return a list of the latest versions of documents given a list of GUIDs and sensor types. + * + * @param getRequests A list of get requests for documents + * @return A list of documents matching or an empty list in not available. + * @throws IOException + */ + Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException; + + /** * Return the latest version of a document given a GetRequest. * @param request The GetRequest which indicates the GUID and sensor type. * @return Optionally the document (dependent upon existence in the index). @@ -82,7 +95,9 @@ public interface IndexDao { } /** - * Update given a Document and optionally the index where the document exists. + * Update a given Document and optionally the index where the document exists. This is a full update, + * meaning the current document will be replaced if it exists or a new document will be created if it does + * not exist. Partial updates are not supported in this method. * * @param update The document to replace from the index. * @param index The index where the document lives. @@ -91,7 +106,7 @@ public interface IndexDao { void update(Document update, Optional<String> index) throws IOException; /** - * Update given a Document and optionally the index where the document exists. + * Similar to the update method but accepts multiple documents and performs updates in batch. * * @param updates A map of the documents to update to the index where they live. * @throws IOException @@ -108,6 +123,13 @@ public interface IndexDao { default void patch( PatchRequest request , Optional<Long> timestamp ) throws OriginalNotFoundException, IOException { + Document d = getPatchedDocument(request, timestamp); + update(d, Optional.ofNullable(request.getIndex())); + } + + default Document getPatchedDocument(PatchRequest request + , Optional<Long> timestamp + ) throws OriginalNotFoundException, IOException { Map<String, Object> latest = request.getSource(); if(latest == null) { Document latestDoc = getLatest(request.getGuid(), request.getSensorType()); @@ -121,13 +143,11 @@ public interface IndexDao { JsonNode originalNode = JSONUtils.INSTANCE.convert(latest, JsonNode.class); JsonNode patched = JSONUtils.INSTANCE.applyPatch(request.getPatch(), originalNode); Map<String, Object> updated = JSONUtils.INSTANCE.getMapper() - .convertValue(patched, new TypeReference<Map<String, Object>>() {}); - Document d = new Document( updated - , request.getGuid() - , request.getSensorType() - , timestamp.orElse(System.currentTimeMillis()) - ); - update(d, Optional.ofNullable(request.getIndex())); + .convertValue(patched, new TypeReference<Map<String, Object>>() {}); + return new Document( updated + , request.getGuid() + , request.getSensorType() + , timestamp.orElse(System.currentTimeMillis())); } /** http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java index de12f22..4530d2a 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java @@ -18,13 +18,50 @@ package org.apache.metron.indexing.dao; +import java.util.List; +import java.util.Optional; import java.io.IOException; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.InvalidCreateException; import org.apache.metron.indexing.dao.search.InvalidSearchException; import org.apache.metron.indexing.dao.search.SearchResponse; +/** + * The MetaAlertDao exposes methods for interacting with meta alerts. Meta alerts are objects that contain + * alerts and summary statistics based on the scores of these alerts. Meta alerts are returned in searches + * just as alerts are and match based on the field values of child alerts. If a child alert matches a search + * the meta alert will be returned while the original child alert will not. A meta alert also contains a + * status field that controls it's inclusion in search results and a groups field that can be used to track + * the groups a meta alert was created from. + * + * The structure of a meta alert is as follows: + * { + * "guid": "meta alert guid", + * "timestamp": timestamp, + * "source:type": "metaalert", + * "alerts": [ array of child alerts ], + * "status": "active or inactive", + * "groups": [ array of group names ], + * "average": 10, + * "max": 10, + * "threat:triage:score": 30, + * "count": 3, + * "sum": 30, + * "min": 10, + * "median": 10 + * } + * + * A child alert that has been added to a meta alert will store the meta alert GUID in a "metaalerts" field. + * This field is an array of meta alert GUIDs, meaning a child alert can be contained in multiple meta alerts. + * Any update to a child alert will trigger an update to the meta alert so that the alert inside a meta alert + * and the original alert will be kept in sync. + * + * Other fields can be added to a meta alert through the patch method on the IndexDao interface. However, attempts + * to directly change the "alerts" or "status" field will result in an exception. + */ public interface MetaAlertDao extends IndexDao { String METAALERTS_INDEX = "metaalert_index"; @@ -46,21 +83,65 @@ public interface MetaAlertDao extends IndexDao { SearchResponse getAllMetaAlertsForAlert(String guid) throws InvalidSearchException; /** - * Create a meta alert. - * @param request The parameters for creating the new meta alert - * @return A response indicating success or failure + * Creates a meta alert from a list of child alerts. The most recent version of each child alert is + * retrieved using the DAO abstractions. + * + * @param request A request object containing get requests for alerts to be added and a list of groups + * @return A response indicating success or failure along with the GUID of the new meta alert * @throws InvalidCreateException If a malformed create request is provided * @throws IOException If a problem occurs during communication */ MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) throws InvalidCreateException, IOException; + + /** + * Adds a list of alerts to an existing meta alert. This will add each alert object to the "alerts" array in the meta alert + * and also add the meta alert GUID to each child alert's "metaalerts" array. After alerts have been added the + * meta alert scores are recalculated. Any alerts already in the meta alert are skipped and no updates are + * performed if all of the alerts are already in the meta alert. The most recent version of each child alert is + * retrieved using the DAO abstractions. Alerts cannot be added to an 'inactive' meta alert. + * + * @param metaAlertGuid The meta alert GUID + * @param getRequests Get requests for alerts to be added + * @return True or false depending on if any alerts were added + * @throws IOException + */ + boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> getRequests) throws IOException; + + /** + * Removes a list of alerts from an existing meta alert. This will remove each alert object from the "alerts" array in the meta alert + * and also remove the meta alert GUID from each child alert's "metaalerts" array. After alerts have been removed the + * meta alert scores are recalculated. Any alerts not contained in the meta alert are skipped and no updates are + * performed if no alerts can be found in the meta alert. Alerts cannot be removed from an 'inactive' meta alert. + * + * @param metaAlertGuid The meta alert GUID + * @param getRequests Get requests for alerts to be removed + * @return True or false depending on if any alerts were removed + * @throws IOException + */ + boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> getRequests) throws IOException; + + /** + * The meta alert status field can be set to either 'active' or 'inactive' and will control whether or not meta alerts + * (and child alerts) appear in search results. An 'active' status will cause meta alerts to appear in search + * results instead of it's child alerts and an 'inactive' status will suppress the meta alert from search results + * with child alerts appearing in search results as normal. A change to 'inactive' will cause the meta alert GUID to + * be removed from all it's child alert's "metaalerts" field. A change back to 'active' will have the opposite effect. + * + * @param metaAlertGuid The GUID of the meta alert + * @param status A status value of 'active' or 'inactive' + * @return True or false depending on if the status was changed + * @throws IOException + */ + boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) throws IOException; + /** * Initializes a Meta Alert DAO with default "sum" meta alert threat sorting. * @param indexDao The DAO to wrap for our queries. */ default void init(IndexDao indexDao) { - init(indexDao, null); + init(indexDao, Optional.empty()); } /** @@ -69,5 +150,5 @@ public interface MetaAlertDao extends IndexDao { * @param threatSort The aggregation to use as the threat field. E.g. "sum", "median", etc. * null is "sum" */ - void init(IndexDao indexDao, String threatSort); + void init(IndexDao indexDao, Optional<String> threatSort); } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java index 779e6c6..ed8bc95 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java @@ -30,6 +30,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.metron.indexing.dao.search.FieldType; +import org.apache.metron.indexing.dao.search.GetRequest; import org.apache.metron.indexing.dao.search.GroupRequest; import org.apache.metron.indexing.dao.search.GroupResponse; import org.apache.metron.indexing.dao.search.InvalidSearchException; @@ -127,6 +128,25 @@ public class MultiIndexDao implements IndexDao { } + private static class DocumentIterableContainer { + private Optional<Iterable<Document>> d = Optional.empty(); + private Optional<Throwable> t = Optional.empty(); + public DocumentIterableContainer(Iterable<Document> d) { + this.d = Optional.ofNullable(d); + } + public DocumentIterableContainer(Throwable t) { + this.t = Optional.ofNullable(t); + } + + public Optional<Iterable<Document>> getDocumentIterable() { + return d; + } + public Optional<Throwable> getException() { + return t; + } + + } + @Override public SearchResponse search(SearchRequest searchRequest) throws InvalidSearchException { for(IndexDao dao : indices) { @@ -189,6 +209,40 @@ public class MultiIndexDao implements IndexDao { return ret; } + @Override + public Iterable<Document> getAllLatest( + List<GetRequest> getRequests) throws IOException { + Iterable<Document> ret = null; + List<DocumentIterableContainer> output = + indices.parallelStream().map(dao -> { + try { + return new DocumentIterableContainer(dao.getAllLatest(getRequests)); + } catch (Throwable e) { + return new DocumentIterableContainer(e); + } + }).collect(Collectors.toList()); + + List<String> error = new ArrayList<>(); + for(DocumentIterableContainer dc : output) { + if(dc.getException().isPresent()) { + Throwable e = dc.getException().get(); + error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e)); + } + else { + if(dc.getDocumentIterable().isPresent()) { + Iterable<Document> documents = dc.getDocumentIterable().get(); + if(ret == null) { + ret = documents; + } + } + } + } + if(error.size() > 0) { + throw new IOException(Joiner.on("\n").join(error)); + } + return ret; + } + public List<IndexDao> getIndices() { return indices; } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java new file mode 100644 index 0000000..6183d37 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with the License. You may obtain + * a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.metron.indexing.dao.metaalert; + +import java.util.Collection; +import java.util.List; +import org.apache.metron.indexing.dao.search.GetRequest; + +public class MetaAlertAddRemoveRequest { + + private String metaAlertGuid; + private List<GetRequest> alerts; + + public String getMetaAlertGuid() { + return metaAlertGuid; + } + + public void setMetaAlertGuid(String metaAlertGuid) { + this.metaAlertGuid = metaAlertGuid; + } + + public List<GetRequest> getAlerts() { + return alerts; + } + + public void setAlerts(List<GetRequest> alerts) { + this.alerts = alerts; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java index 388527a..d368b3a 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java @@ -22,23 +22,23 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.metron.indexing.dao.search.GetRequest; public class MetaAlertCreateRequest { - // A map from the alert GUID to the Document index - private Map<String, String> guidToIndices; + private List<GetRequest> alerts; private List<String> groups; public MetaAlertCreateRequest() { - this.guidToIndices = new HashMap<>(); + this.alerts = new ArrayList<>(); this.groups = new ArrayList<>(); } - public Map<String, String> getGuidToIndices() { - return guidToIndices; + public List<GetRequest> getAlerts() { + return alerts; } - public void setGuidToIndices(Map<String, String> guidToIndices) { - this.guidToIndices = guidToIndices; + public void setAlerts(List<GetRequest> alerts) { + this.alerts = alerts; } public List<String> getGroups() { http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java new file mode 100644 index 0000000..c9b0138 --- /dev/null +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.dao.metaalert; + +public enum MetaAlertStatus { + ACTIVE("active"), + INACTIVE("inactive"); + + private String statusString; + + MetaAlertStatus(String statusString) { + this.statusString = statusString; + } + + public String getStatusString() { + return statusString; + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java index eb255dc..959d4e6 100644 --- a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java +++ b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java @@ -17,9 +17,27 @@ */ package org.apache.metron.indexing.dao.search; +import com.fasterxml.jackson.annotation.JsonGetter; +import java.util.Optional; + public class GetRequest { - String guid; - String sensorType; + private String guid; + private String sensorType; + private String index; + + public GetRequest() { + } + + public GetRequest(String guid, String sensorType) { + this.guid = guid; + this.sensorType = sensorType; + } + + public GetRequest(String guid, String sensorType, String index) { + this.guid = guid; + this.sensorType = sensorType; + this.index = index; + } /** * The GUID of the document @@ -44,4 +62,17 @@ public class GetRequest { public void setSensorType(String sensorType) { this.sensorType = sensorType; } + + public Optional<String> getIndex() { + return index != null ? Optional.of(this.index) : Optional.empty(); + } + + @JsonGetter("index") + public String getIndexString() { + return index; + } + + public void setIndex(String index) { + this.index = index; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java index f48187e..3bce4d0 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java @@ -17,10 +17,13 @@ */ package org.apache.metron.indexing.dao; +import static org.apache.metron.common.Constants.SENSOR_TYPE; + import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Splitter; import com.google.common.collect.ComparisonChain; import com.google.common.collect.Iterables; +import java.util.stream.Collectors; import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.search.*; @@ -32,7 +35,7 @@ import java.util.*; public class InMemoryDao implements IndexDao { // Map from index to list of documents as JSON strings public static Map<String, List<String>> BACKING_STORE = new HashMap<>(); - public static Map<String, Map<String, FieldType>> COLUMN_METADATA; + public static Map<String, Map<String, FieldType>> COLUMN_METADATA = new HashMap<>(); private AccessConfig config; @Override @@ -169,7 +172,7 @@ public class InMemoryDao implements IndexDao { return false; } - private static Map<String, Object> parse(String doc) { + public static Map<String, Object> parse(String doc) { try { return JSONUtils.INSTANCE.load(doc, new TypeReference<Map<String, Object>>() {}); } catch (IOException e) { @@ -199,6 +202,24 @@ public class InMemoryDao implements IndexDao { } @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + List<Document> documents = new ArrayList<>(); + for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) { + for(String doc : kv.getValue()) { + Map<String, Object> docParsed = parse(doc); + String guid = (String) docParsed.getOrDefault(Constants.GUID, ""); + for (GetRequest getRequest: getRequests) { + if(getRequest.getGuid().equals(guid)) { + documents.add(new Document(doc, guid, getRequest.getSensorType(), 0L)); + } + } + + } + } + return documents; + } + + @Override public void update(Document update, Optional<String> index) throws IOException { for (Map.Entry<String, List<String>> kv : BACKING_STORE.entrySet()) { if (kv.getKey().startsWith(update.getSensorType())) { http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java index cb7635e..fad0eda 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java @@ -18,17 +18,23 @@ package org.apache.metron.indexing.dao; +import static org.apache.metron.common.Constants.GUID; + import com.google.common.collect.ImmutableList; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.stream.Collectors; import org.adrianwalker.multilinestring.Multiline; -import org.apache.metron.common.Constants; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest; import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse; +import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus; import org.apache.metron.indexing.dao.metaalert.MetaScores; import org.apache.metron.indexing.dao.search.FieldType; import org.apache.metron.indexing.dao.search.GetRequest; @@ -48,6 +54,8 @@ import org.json.simple.JSONObject; public class InMemoryMetaAlertDao implements MetaAlertDao { + public static Map<String, Collection<String>> METAALERT_STORE = new HashMap<>(); + private IndexDao indexDao; /** @@ -83,7 +91,7 @@ public class InMemoryMetaAlertDao implements MetaAlertDao { } @Override - public void init(IndexDao indexDao, String threatSort) { + public void init(IndexDao indexDao, Optional<String> threatSort) { this.indexDao = indexDao; // Ignore threatSort for test. } @@ -94,6 +102,11 @@ public class InMemoryMetaAlertDao implements MetaAlertDao { } @Override + public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws IOException { + return indexDao.getAllLatest(getRequests); + } + + @Override public void update(Document update, Optional<String> index) throws IOException { indexDao.update(update, index); } @@ -146,15 +159,16 @@ public class InMemoryMetaAlertDao implements MetaAlertDao { @Override public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request) throws InvalidCreateException, IOException { - if (request.getGuidToIndices().isEmpty()) { + List<GetRequest> alertRequests = request.getAlerts(); + if (alertRequests.isEmpty()) { MetaAlertCreateResponse response = new MetaAlertCreateResponse(); response.setCreated(false); return response; } // Build meta alert json. Give it a reasonable GUID JSONObject metaAlert = new JSONObject(); - metaAlert.put(Constants.GUID, - "meta_" + (InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1)); + String metaAlertGuid = "meta_" + (InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1); + metaAlert.put(GUID, metaAlertGuid); JSONArray groupsArray = new JSONArray(); groupsArray.addAll(request.getGroups()); @@ -164,16 +178,17 @@ public class InMemoryMetaAlertDao implements MetaAlertDao { // For the purpose of testing, we're just using guids for the alerts field and grabbing the scores. JSONArray alertArray = new JSONArray(); List<Double> threatScores = new ArrayList<>(); - for (Map.Entry<String, String> entry : request.getGuidToIndices().entrySet()) { + Collection<String> alertGuids = new ArrayList<>(); + for (GetRequest alertRequest : alertRequests) { SearchRequest searchRequest = new SearchRequest(); - searchRequest.setIndices(ImmutableList.of(entry.getValue())); - searchRequest.setQuery("guid:" + entry.getKey()); + searchRequest.setIndices(ImmutableList.of(alertRequest.getIndex().get())); + searchRequest.setQuery("guid:" + alertRequest.getGuid()); try { SearchResponse searchResponse = search(searchRequest); List<SearchResult> searchResults = searchResponse.getResults(); if (searchResults.size() > 1) { throw new InvalidCreateException( - "Found more than one result for: " + entry.getKey() + ". Values: " + searchResults + "Found more than one result for: " + alertRequest.getGuid() + ". Values: " + searchResults ); } @@ -186,18 +201,79 @@ public class InMemoryMetaAlertDao implements MetaAlertDao { threatScores.add(threatScore); } } catch (InvalidSearchException e) { - throw new InvalidCreateException("Unable to find guid: " + entry.getKey(), e); + throw new InvalidCreateException("Unable to find guid: " + alertRequest.getGuid(), e); } + alertGuids.add(alertRequest.getGuid()); } metaAlert.put(MetaAlertDao.ALERT_FIELD, alertArray); metaAlert.putAll(new MetaScores(threatScores).getMetaScores()); + metaAlert.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString()); // Add the alert to the store, but make sure not to overwrite existing results InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).add(metaAlert.toJSONString()); + METAALERT_STORE.put(metaAlertGuid, new HashSet<>(alertGuids)); + MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse(); + createResponse.setGuid(metaAlertGuid); createResponse.setCreated(true); return createResponse; } + + @Override + public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) throws IOException { + Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid); + if (currentAlertGuids == null) { + return false; + } + Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet()); + boolean added = currentAlertGuids.addAll(alertGuids); + if (added) { + METAALERT_STORE.put(metaAlertGuid, currentAlertGuids); + } + return added; + } + + @Override + public boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> alertRequests) throws IOException { + Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid); + if (currentAlertGuids == null) { + return false; + } + Collection<String> alertGuids = alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet()); + boolean removed = currentAlertGuids.removeAll(alertGuids); + if (removed) { + METAALERT_STORE.put(metaAlertGuid, currentAlertGuids); + } + return removed; + } + + @SuppressWarnings("unchecked") + @Override + public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) + throws IOException { + boolean statusChanged = false; + List<String> metaAlerts = InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX); + for (String metaAlert: metaAlerts) { + JSONObject metaAlertJSON = JSONUtils.INSTANCE.load(metaAlert, JSONObject.class); + if (metaAlertGuid.equals(metaAlertJSON.get(GUID))) { + statusChanged = !status.getStatusString().equals(metaAlertJSON.get(STATUS_FIELD)); + if (statusChanged) { + metaAlertJSON.put(STATUS_FIELD, status.getStatusString()); + metaAlerts.remove(metaAlert); + metaAlerts.add(metaAlertJSON.toJSONString()); + InMemoryDao.BACKING_STORE.put(MetaAlertDao.METAALERTS_INDEX, metaAlerts); + } + break; + } + } + return statusChanged; + } + + public static void clear() { + InMemoryDao.clear(); + METAALERT_STORE.clear(); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java index 2961d96..d991d50 100644 --- a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java @@ -17,6 +17,8 @@ */ package org.apache.metron.indexing.dao; +import com.fasterxml.jackson.core.type.TypeReference; +import java.util.Iterator; import java.util.Optional; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.utils.JSONUtils; @@ -29,8 +31,8 @@ import org.apache.metron.indexing.dao.search.SearchRequest; import org.apache.metron.indexing.dao.search.SearchResponse; import org.apache.metron.indexing.dao.search.SearchResult; import org.apache.metron.indexing.dao.search.GroupResult; +import org.apache.metron.indexing.dao.update.Document; import org.apache.metron.integration.InMemoryComponent; -import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -103,6 +105,21 @@ public abstract class SearchIntegrationTest { public static String findOneGuidQuery; /** + * [ + * { + * "guid": "bro-1", + * "sensorType": "bro" + * }, + * { + * "guid": "bro-2", + * "sensorType": "bro" + * } + * ] + */ + @Multiline + public static String getAllLatestQuery; + + /** * { * "indices": ["bro", "snort"], * "query": "ip_src_addr:192.168.1.1", @@ -390,6 +407,19 @@ public abstract class SearchIntegrationTest { Assert.assertEquals("bro", doc.get("source:type")); Assert.assertEquals(3, doc.get("timestamp")); } + //Get All Latest Guid Testcase + { + List<GetRequest> request = JSONUtils.INSTANCE.load(getAllLatestQuery, new TypeReference<List<GetRequest>>() { + }); + Iterator<Document> response = dao.getAllLatest(request).iterator(); + Document bro2 = response.next(); + Assert.assertEquals("bro_1", bro2.getDocument().get("guid")); + Assert.assertEquals("bro", bro2.getDocument().get("source:type")); + Document snort2 = response.next(); + Assert.assertEquals("bro_2", snort2.getDocument().get("guid")); + Assert.assertEquals("bro", snort2.getDocument().get("source:type")); + Assert.assertFalse(response.hasNext()); + } //Filter test case { SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, SearchRequest.class); http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java new file mode 100644 index 0000000..aa32aa0 --- /dev/null +++ b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.metron.indexing.integration; + +import static org.apache.metron.indexing.dao.HBaseDao.HBASE_CF; +import static org.apache.metron.indexing.dao.HBaseDao.HBASE_TABLE; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import org.apache.commons.codec.binary.Hex; +import org.apache.metron.hbase.mock.MockHBaseTableProvider; +import org.apache.metron.indexing.dao.AccessConfig; +import org.apache.metron.indexing.dao.HBaseDao; +import org.apache.metron.indexing.dao.IndexDao; +import org.apache.metron.indexing.dao.search.GetRequest; +import org.apache.metron.indexing.dao.update.Document; +import org.junit.After; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +public class HBaseDaoIntegrationTest { + + private static final String TABLE_NAME = "metron_update"; + private static final String COLUMN_FAMILY = "cf"; + private static final String SENSOR_TYPE = "test"; + + private static IndexDao hbaseDao; + private static byte[] expectedKeySerialization = new byte[] { + (byte)0xf5,0x53,0x76,(byte)0x96,0x67,0x3a, + (byte)0xc1,(byte)0xaf,(byte)0xff,0x41,0x33,(byte)0x9d, + (byte)0xac,(byte)0xb9,0x1a,(byte)0xb0,0x00,0x04, + 0x67,0x75,0x69,0x64,0x00,0x0a, + 0x73,0x65,0x6e,0x73,0x6f,0x72, + 0x54,0x79,0x70,0x65 + }; + + @BeforeClass + public static void startHBase() throws Exception { + AccessConfig accessConfig = new AccessConfig(); + accessConfig.setMaxSearchResults(1000); + accessConfig.setMaxSearchGroups(1000); + accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{ + put(HBASE_TABLE, TABLE_NAME); + put(HBASE_CF, COLUMN_FAMILY); + }}); + MockHBaseTableProvider.addToCache(TABLE_NAME, COLUMN_FAMILY); + accessConfig.setTableProvider(new MockHBaseTableProvider()); + + hbaseDao = new HBaseDao(); + hbaseDao.init(accessConfig); + } + + @After + public void clearTable() throws Exception { + MockHBaseTableProvider.clear(); + } + + /** + * IF this test fails then you have broken the key serialization in that your change has + * caused a key to change serialization, so keys from previous releases will not be able to be found + * under your scheme. Please either provide a migration plan or undo this change. DO NOT CHANGE THIS + * TEST BLITHELY! + * @throws Exception + */ + @Test + public void testKeySerializationRemainsConstant() throws IOException { + HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType"); + byte[] raw = k.toBytes(); + Assert.assertArrayEquals(raw, expectedKeySerialization); + } + + + @Test + public void testKeySerialization() throws Exception { + HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType"); + Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k))); + } + + @Test(expected = IllegalStateException.class) + public void testKeySerializationWithInvalidGuid() throws Exception { + HBaseDao.Key k = new HBaseDao.Key(null, "sensorType"); + Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k))); + } + + @Test(expected = IllegalStateException.class) + public void testKeySerializationWithInvalidSensorType() throws Exception { + HBaseDao.Key k = new HBaseDao.Key("guid", null); + Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k))); + } + + @Test + public void shouldGetLatest() throws Exception { + // Load alerts + List<Document> alerts = buildAlerts(3); + Map<Document, Optional<String>> updates = alerts.stream() + .collect(Collectors.toMap(document -> document, document -> Optional.empty())); + hbaseDao.batchUpdate(updates); + + Document actualDocument = hbaseDao.getLatest("message_1", SENSOR_TYPE); + Document expectedDocument = alerts.get(1); + Assert.assertEquals(expectedDocument, actualDocument); + } + + @Test + public void shouldGetAllLatest() throws Exception { + // Load alerts + List<Document> alerts = buildAlerts(15); + alerts.stream().collect(Collectors.toMap(Document::getGuid, document -> Optional.empty())); + Map<Document, Optional<String>> updates = alerts.stream() + .collect(Collectors.toMap(document -> document, document -> Optional.empty())); + hbaseDao.batchUpdate(updates); + + int expectedCount = 12; + List<GetRequest> getRequests = new ArrayList<>(); + for(int i = 1; i < expectedCount + 1; i ++) { + getRequests.add(new GetRequest("message_" + i, SENSOR_TYPE)); + } + Iterator<Document> results = hbaseDao.getAllLatest(getRequests).iterator(); + + for (int i = 0; i < expectedCount; i++) { + Document expectedDocument = alerts.get(i + 1); + Document actualDocument = results.next(); + Assert.assertEquals(expectedDocument, actualDocument); + } + + Assert.assertFalse("Result size should be 12 but was greater", results.hasNext()); + } + + protected List<Document> buildAlerts(int count) throws IOException { + List<Document> alerts = new ArrayList<>(); + for (int i = 0; i < count; ++i) { + String guid = "message_" + i; + String json = "{\"guid\":\"message_" + i + "\", \"source:type\":\"test\"}"; + Document alert = new Document(json, guid, SENSOR_TYPE, System.currentTimeMillis()); + alerts.add(alert); + } + return alerts; + } + +}
