Repository: metron
Updated Branches:
  refs/heads/master c4c930f7c -> fd896fbeb


http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
index 1775018..3103ea7 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/HBaseDao.java
@@ -18,13 +18,19 @@
 
 package org.apache.metron.indexing.dao;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Optional;
+
+import com.google.common.hash.Hasher;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -32,7 +38,9 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.common.utils.KeyUtil;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
@@ -46,7 +54,7 @@ import org.apache.metron.indexing.dao.update.Document;
  * * Get document
  *
  * The mechanism here is that updates to documents will be added to a HBase 
Table as a write-ahead log.
- * The Key for a row supporting a given document will be the GUID, which 
should be sufficiently distributed.
+ * The Key for a row supporting a given document will be the GUID plus the 
sensor type, which should be sufficiently distributed.
  * Every new update will have a column added (column qualifier will be the 
timestamp of the update).
  * Upon retrieval, the most recent column will be returned.
  *
@@ -57,6 +65,72 @@ public class HBaseDao implements IndexDao {
   private HTableInterface tableInterface;
   private byte[] cf;
   private AccessConfig config;
+
+  /**
+   * Implements the HBaseDao row key and exposes convenience methods for 
serializing/deserializing the row key.
+   * The row key is made of a GUID and sensor type along with a prefix to 
ensure data is distributed evenly.
+   */
+  public static class Key {
+    private String guid;
+    private String sensorType;
+    public Key(String guid, String sensorType) {
+      this.guid = guid;
+      this.sensorType = sensorType;
+    }
+
+    public String getGuid() {
+      return guid;
+    }
+
+    public String getSensorType() {
+      return sensorType;
+    }
+
+    public static Key fromBytes(byte[] buffer) throws IOException {
+      ByteArrayInputStream baos = new ByteArrayInputStream(buffer);
+      DataInputStream w = new DataInputStream(baos);
+      baos.skip(KeyUtil.HASH_PREFIX_SIZE);
+      return new Key(w.readUTF(), w.readUTF());
+    }
+
+    public byte[] toBytes() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream();
+      if(getGuid() == null || getSensorType() == null) {
+        throw new IllegalStateException("Guid and sensor type must not be 
null: guid = " + getGuid() + ", sensorType = " + getSensorType());
+      }
+      DataOutputStream w = new DataOutputStream(baos);
+      w.writeUTF(getGuid());
+      w.writeUTF(getSensorType());
+      w.flush();
+      byte[] key = baos.toByteArray();
+      byte[] prefix = KeyUtil.INSTANCE.getPrefix(key);
+      return KeyUtil.INSTANCE.merge(prefix, key);
+    }
+
+    public static byte[] toBytes(Key k) throws IOException {
+      return k.toBytes();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Key key = (Key) o;
+
+      if (getGuid() != null ? !getGuid().equals(key.getGuid()) : key.getGuid() 
!= null) return false;
+      return getSensorType() != null ? 
getSensorType().equals(key.getSensorType()) : key.getSensorType() == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = getGuid() != null ? getGuid().hashCode() : 0;
+      result = 31 * result + (getSensorType() != null ? 
getSensorType().hashCode() : 0);
+      return result;
+    }
+  }
+
   public HBaseDao() {
 
   }
@@ -102,9 +176,32 @@ public class HBaseDao implements IndexDao {
 
   @Override
   public synchronized Document getLatest(String guid, String sensorType) 
throws IOException {
-    Get get = new Get(guid.getBytes());
+    Key k = new Key(guid, sensorType);
+    Get get = new Get(Key.toBytes(k));
     get.addFamily(cf);
     Result result = getTableInterface().get(get);
+    return getDocumentFromResult(result);
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    List<Get> gets = new ArrayList<>();
+    for (GetRequest getRequest: getRequests) {
+      gets.add(buildGet(getRequest));
+    }
+    Result[] results = getTableInterface().get(gets);
+    List<Document> allLatest = new ArrayList<>();
+    for (Result result: results) {
+      Document d = getDocumentFromResult(result);
+      if (d != null) {
+        allLatest.add(d);
+      }
+    }
+    return allLatest;
+  }
+
+  private Document getDocumentFromResult(Result result) throws IOException {
     NavigableMap<byte[], byte[]> columns = result.getFamilyMap( cf);
     if(columns == null || columns.size() == 0) {
       return null;
@@ -112,8 +209,14 @@ public class HBaseDao implements IndexDao {
     Map.Entry<byte[], byte[]> entry= columns.lastEntry();
     Long ts = Bytes.toLong(entry.getKey());
     if(entry.getValue()!= null) {
-      String json = new String(entry.getValue());
-      return new Document(json, guid, sensorType, ts);
+      Map<String, Object> json = JSONUtils.INSTANCE.load(new 
String(entry.getValue()),
+          new TypeReference<Map<String, Object>>() {});
+      try {
+        Key k = Key.fromBytes(result.getRow());
+        return new Document(json, k.getGuid(), k.getSensorType(), ts);
+      } catch (IOException e) {
+        throw new RuntimeException("Unable to convert row key to a document", 
e);
+      }
     }
     else {
       return null;
@@ -126,8 +229,6 @@ public class HBaseDao implements IndexDao {
     getTableInterface().put(put);
   }
 
-
-
   @Override
   public void batchUpdate(Map<Document, Optional<String>> updates) throws 
IOException {
     List<Put> puts = new ArrayList<>();
@@ -140,8 +241,16 @@ public class HBaseDao implements IndexDao {
     getTableInterface().put(puts);
   }
 
-  protected Put buildPut(Document update) throws JsonProcessingException {
-    Put put = new Put(update.getGuid().getBytes());
+  protected Get buildGet(GetRequest getRequest) throws IOException {
+    Key k = new Key(getRequest.getGuid(), getRequest.getSensorType());
+    Get get = new Get(Key.toBytes(k));
+    get.addFamily(cf);
+    return get;
+  }
+
+  protected Put buildPut(Document update) throws IOException {
+    Key k = new Key(update.getGuid(), update.getSensorType());
+    Put put = new Put(Key.toBytes(k));
     long ts = update.getTimestamp() == null ? System.currentTimeMillis() : 
update.getTimestamp();
     byte[] columnQualifier = Bytes.toBytes(ts);
     byte[] doc = JSONUtils.INSTANCE.toJSONPretty(update.getDocument());
@@ -149,6 +258,7 @@ public class HBaseDao implements IndexDao {
     return put;
   }
 
+
   @Override
   public Map<String, Map<String, FieldType>> getColumnMetadata(List<String> 
indices) throws IOException {
     return null;

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
index 4b7829e..8855a14 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/IndexDao.java
@@ -36,6 +36,10 @@ import 
org.apache.metron.indexing.dao.update.OriginalNotFoundException;
 import org.apache.metron.indexing.dao.update.PatchRequest;
 import org.apache.metron.indexing.dao.update.ReplaceRequest;
 
+/**
+ * The IndexDao provides a common interface for retrieving and storing data in 
a variety of persistent stores.
+ * Document reads and writes require a GUID and sensor type with an index 
being optional.
+ */
 public interface IndexDao {
 
   /**
@@ -66,6 +70,15 @@ public interface IndexDao {
   Document getLatest(String guid, String sensorType) throws IOException;
 
   /**
+   * Return a list of the latest versions of documents given a list of GUIDs 
and sensor types.
+   *
+   * @param getRequests A list of get requests for documents
+   * @return A list of documents matching or an empty list in not available.
+   * @throws IOException
+   */
+  Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException;
+
+  /**
    * Return the latest version of a document given a GetRequest.
    * @param request The GetRequest which indicates the GUID and sensor type.
    * @return Optionally the document (dependent upon existence in the index).
@@ -82,7 +95,9 @@ public interface IndexDao {
   }
 
   /**
-   * Update given a Document and optionally the index where the document 
exists.
+   * Update a given Document and optionally the index where the document 
exists.  This is a full update,
+   * meaning the current document will be replaced if it exists or a new 
document will be created if it does
+   * not exist.  Partial updates are not supported in this method.
    *
    * @param update The document to replace from the index.
    * @param index The index where the document lives.
@@ -91,7 +106,7 @@ public interface IndexDao {
   void update(Document update, Optional<String> index) throws IOException;
 
   /**
-   * Update given a Document and optionally the index where the document 
exists.
+   * Similar to the update method but accepts multiple documents and performs 
updates in batch.
    *
    * @param updates A map of the documents to update to the index where they 
live.
    * @throws IOException
@@ -108,6 +123,13 @@ public interface IndexDao {
   default void patch( PatchRequest request
                     , Optional<Long> timestamp
                     ) throws OriginalNotFoundException, IOException {
+    Document d = getPatchedDocument(request, timestamp);
+    update(d, Optional.ofNullable(request.getIndex()));
+  }
+
+  default Document getPatchedDocument(PatchRequest request
+      , Optional<Long> timestamp
+      ) throws OriginalNotFoundException, IOException {
     Map<String, Object> latest = request.getSource();
     if(latest == null) {
       Document latestDoc = getLatest(request.getGuid(), 
request.getSensorType());
@@ -121,13 +143,11 @@ public interface IndexDao {
     JsonNode originalNode = JSONUtils.INSTANCE.convert(latest, JsonNode.class);
     JsonNode patched = JSONUtils.INSTANCE.applyPatch(request.getPatch(), 
originalNode);
     Map<String, Object> updated = JSONUtils.INSTANCE.getMapper()
-                                           .convertValue(patched, new 
TypeReference<Map<String, Object>>() {});
-    Document d = new Document( updated
-                             , request.getGuid()
-                             , request.getSensorType()
-                             , timestamp.orElse(System.currentTimeMillis())
-                             );
-    update(d, Optional.ofNullable(request.getIndex()));
+        .convertValue(patched, new TypeReference<Map<String, Object>>() {});
+    return new Document( updated
+        , request.getGuid()
+        , request.getSensorType()
+        , timestamp.orElse(System.currentTimeMillis()));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
index de12f22..4530d2a 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MetaAlertDao.java
@@ -18,13 +18,50 @@
 
 package org.apache.metron.indexing.dao;
 
+import java.util.List;
+import java.util.Optional;
 import java.io.IOException;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.InvalidCreateException;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 
+/**
+ * The MetaAlertDao exposes methods for interacting with meta alerts.  Meta 
alerts are objects that contain
+ * alerts and summary statistics based on the scores of these alerts.  Meta 
alerts are returned in searches
+ * just as alerts are and match based on the field values of child alerts.  If 
a child alert matches a search
+ * the meta alert will be returned while the original child alert will not.  A 
meta alert also contains a
+ * status field that controls it's inclusion in search results and a groups 
field that can be used to track
+ * the groups a meta alert was created from.
+ *
+ * The structure of a meta alert is as follows:
+ * {
+ *   "guid": "meta alert guid",
+ *   "timestamp": timestamp,
+ *   "source:type": "metaalert",
+ *   "alerts": [ array of child alerts ],
+ *   "status": "active or inactive",
+ *   "groups": [ array of group names ],
+ *   "average": 10,
+ *   "max": 10,
+ *   "threat:triage:score": 30,
+ *   "count": 3,
+ *   "sum": 30,
+ *   "min": 10,
+ *   "median": 10
+ * }
+ *
+ * A child alert that has been added to a meta alert will store the meta alert 
GUID in a "metaalerts" field.
+ * This field is an array of meta alert GUIDs, meaning a child alert can be 
contained in multiple meta alerts.
+ * Any update to a child alert will trigger an update to the meta alert so 
that the alert inside a meta alert
+ * and the original alert will be kept in sync.
+ *
+ * Other fields can be added to a meta alert through the patch method on the 
IndexDao interface.  However, attempts
+ * to directly change the "alerts" or "status" field will result in an 
exception.
+ */
 public interface MetaAlertDao extends IndexDao {
 
   String METAALERTS_INDEX = "metaalert_index";
@@ -46,21 +83,65 @@ public interface MetaAlertDao extends IndexDao {
   SearchResponse getAllMetaAlertsForAlert(String guid) throws 
InvalidSearchException;
 
   /**
-   * Create a meta alert.
-   * @param request The parameters for creating the new meta alert
-   * @return A response indicating success or failure
+   * Creates a meta alert from a list of child alerts.  The most recent 
version of each child alert is
+   * retrieved using the DAO abstractions.
+   *
+   * @param request A request object containing get requests for alerts to be 
added and a list of groups
+   * @return A response indicating success or failure along with the GUID of 
the new meta alert
    * @throws InvalidCreateException If a malformed create request is provided
    * @throws IOException If a problem occurs during communication
    */
   MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest request)
       throws InvalidCreateException, IOException;
 
+
+  /**
+   * Adds a list of alerts to an existing meta alert.  This will add each 
alert object to the "alerts" array in the meta alert
+   * and also add the meta alert GUID to each child alert's "metaalerts" 
array.  After alerts have been added the
+   * meta alert scores are recalculated.  Any alerts already in the meta alert 
are skipped and no updates are
+   * performed if all of the alerts are already in the meta alert.  The most 
recent version of each child alert is
+   * retrieved using the DAO abstractions.  Alerts cannot be added to an 
'inactive' meta alert.
+   *
+   * @param metaAlertGuid The meta alert GUID
+   * @param getRequests Get requests for alerts to be added
+   * @return True or false depending on if any alerts were added
+   * @throws IOException
+   */
+  boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> 
getRequests) throws IOException;
+
+  /**
+   * Removes a list of alerts from an existing meta alert.  This will remove 
each alert object from the "alerts" array in the meta alert
+   * and also remove the meta alert GUID from each child alert's "metaalerts" 
array.  After alerts have been removed the
+   * meta alert scores are recalculated.  Any alerts not contained in the meta 
alert are skipped and no updates are
+   * performed if no alerts can be found in the meta alert.  Alerts cannot be 
removed from an 'inactive' meta alert.
+   *
+   * @param metaAlertGuid The meta alert GUID
+   * @param getRequests Get requests for alerts to be removed
+   * @return True or false depending on if any alerts were removed
+   * @throws IOException
+   */
+  boolean removeAlertsFromMetaAlert(String metaAlertGuid, List<GetRequest> 
getRequests) throws IOException;
+
+  /**
+   * The meta alert status field can be set to either 'active' or 'inactive' 
and will control whether or not meta alerts
+   * (and child alerts) appear in search results.  An 'active' status will 
cause meta alerts to appear in search
+   * results instead of it's child alerts and an 'inactive' status will 
suppress the meta alert from search results
+   * with child alerts appearing in search results as normal.  A change to 
'inactive' will cause the meta alert GUID to
+   * be removed from all it's child alert's "metaalerts" field.  A change back 
to 'active' will have the opposite effect.
+   *
+   * @param metaAlertGuid The GUID of the meta alert
+   * @param status A status value of 'active' or 'inactive'
+   * @return True or false depending on if the status was changed
+   * @throws IOException
+   */
+  boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus status) 
throws IOException;
+
   /**
    * Initializes a Meta Alert DAO with default "sum" meta alert threat sorting.
    * @param indexDao The DAO to wrap for our queries.
    */
   default void init(IndexDao indexDao) {
-    init(indexDao, null);
+    init(indexDao, Optional.empty());
   }
 
   /**
@@ -69,5 +150,5 @@ public interface MetaAlertDao extends IndexDao {
    * @param threatSort The aggregation to use as the threat field. E.g. "sum", 
"median", etc.
    *     null is "sum"
    */
-  void init(IndexDao indexDao, String threatSort);
+  void init(IndexDao indexDao, Optional<String> threatSort);
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
index 779e6c6..ed8bc95 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/MultiIndexDao.java
@@ -30,6 +30,7 @@ import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
 import org.apache.metron.indexing.dao.search.GroupRequest;
 import org.apache.metron.indexing.dao.search.GroupResponse;
 import org.apache.metron.indexing.dao.search.InvalidSearchException;
@@ -127,6 +128,25 @@ public class MultiIndexDao implements IndexDao {
 
   }
 
+  private static class DocumentIterableContainer {
+    private Optional<Iterable<Document>> d = Optional.empty();
+    private Optional<Throwable> t = Optional.empty();
+    public DocumentIterableContainer(Iterable<Document> d) {
+      this.d = Optional.ofNullable(d);
+    }
+    public DocumentIterableContainer(Throwable t) {
+      this.t = Optional.ofNullable(t);
+    }
+
+    public Optional<Iterable<Document>> getDocumentIterable() {
+      return d;
+    }
+    public Optional<Throwable> getException() {
+      return t;
+    }
+
+  }
+
   @Override
   public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
     for(IndexDao dao : indices) {
@@ -189,6 +209,40 @@ public class MultiIndexDao implements IndexDao {
     return ret;
   }
 
+  @Override
+  public Iterable<Document> getAllLatest(
+      List<GetRequest> getRequests) throws IOException {
+    Iterable<Document> ret = null;
+    List<DocumentIterableContainer> output =
+        indices.parallelStream().map(dao -> {
+          try {
+            return new 
DocumentIterableContainer(dao.getAllLatest(getRequests));
+          } catch (Throwable e) {
+            return new DocumentIterableContainer(e);
+          }
+        }).collect(Collectors.toList());
+
+    List<String> error = new ArrayList<>();
+    for(DocumentIterableContainer dc : output) {
+      if(dc.getException().isPresent()) {
+        Throwable e = dc.getException().get();
+        error.add(e.getMessage() + "\n" + ExceptionUtils.getStackTrace(e));
+      }
+      else {
+        if(dc.getDocumentIterable().isPresent()) {
+          Iterable<Document> documents = dc.getDocumentIterable().get();
+          if(ret == null) {
+            ret = documents;
+          }
+        }
+      }
+    }
+    if(error.size() > 0) {
+      throw new IOException(Joiner.on("\n").join(error));
+    }
+    return ret;
+  }
+
   public List<IndexDao> getIndices() {
     return indices;
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
new file mode 100644
index 0000000..6183d37
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertAddRemoveRequest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao.metaalert;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.metron.indexing.dao.search.GetRequest;
+
+public class MetaAlertAddRemoveRequest {
+
+  private String metaAlertGuid;
+  private List<GetRequest> alerts;
+
+  public String getMetaAlertGuid() {
+    return metaAlertGuid;
+  }
+
+  public void setMetaAlertGuid(String metaAlertGuid) {
+    this.metaAlertGuid = metaAlertGuid;
+  }
+
+  public List<GetRequest> getAlerts() {
+    return alerts;
+  }
+
+  public void setAlerts(List<GetRequest> alerts) {
+    this.alerts = alerts;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
index 388527a..d368b3a 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertCreateRequest.java
@@ -22,23 +22,23 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.metron.indexing.dao.search.GetRequest;
 
 public class MetaAlertCreateRequest {
-  // A map from the alert GUID to the Document index
-  private Map<String, String> guidToIndices;
+  private List<GetRequest> alerts;
   private List<String> groups;
 
   public MetaAlertCreateRequest() {
-    this.guidToIndices = new HashMap<>();
+    this.alerts = new ArrayList<>();
     this.groups = new ArrayList<>();
   }
 
-  public Map<String, String> getGuidToIndices() {
-    return guidToIndices;
+  public List<GetRequest> getAlerts() {
+    return alerts;
   }
 
-  public void setGuidToIndices(Map<String, String> guidToIndices) {
-    this.guidToIndices = guidToIndices;
+  public void setAlerts(List<GetRequest> alerts) {
+    this.alerts = alerts;
   }
 
   public List<String> getGroups() {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
new file mode 100644
index 0000000..c9b0138
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/metaalert/MetaAlertStatus.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao.metaalert;
+
+public enum MetaAlertStatus {
+  ACTIVE("active"),
+  INACTIVE("inactive");
+
+  private String statusString;
+
+  MetaAlertStatus(String statusString) {
+    this.statusString = statusString;
+  }
+
+  public String getStatusString() {
+    return statusString;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
index eb255dc..959d4e6 100644
--- 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/GetRequest.java
@@ -17,9 +17,27 @@
  */
 package org.apache.metron.indexing.dao.search;
 
+import com.fasterxml.jackson.annotation.JsonGetter;
+import java.util.Optional;
+
 public class GetRequest {
-  String guid;
-  String sensorType;
+  private String guid;
+  private String sensorType;
+  private String index;
+
+  public GetRequest() {
+  }
+
+  public GetRequest(String guid, String sensorType) {
+    this.guid = guid;
+    this.sensorType = sensorType;
+  }
+
+  public GetRequest(String guid, String sensorType, String index) {
+    this.guid = guid;
+    this.sensorType = sensorType;
+    this.index = index;
+  }
 
   /**
    * The GUID of the document
@@ -44,4 +62,17 @@ public class GetRequest {
   public void setSensorType(String sensorType) {
     this.sensorType = sensorType;
   }
+
+  public Optional<String> getIndex() {
+    return index != null ? Optional.of(this.index) : Optional.empty();
+  }
+
+  @JsonGetter("index")
+  public String getIndexString() {
+    return index;
+  }
+
+  public void setIndex(String index) {
+    this.index = index;
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
index f48187e..3bce4d0 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryDao.java
@@ -17,10 +17,13 @@
  */
 package org.apache.metron.indexing.dao;
 
+import static org.apache.metron.common.Constants.SENSOR_TYPE;
+
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ComparisonChain;
 import com.google.common.collect.Iterables;
+import java.util.stream.Collectors;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.search.*;
@@ -32,7 +35,7 @@ import java.util.*;
 public class InMemoryDao implements IndexDao {
   // Map from index to list of documents as JSON strings
   public static Map<String, List<String>> BACKING_STORE = new HashMap<>();
-  public static Map<String, Map<String, FieldType>> COLUMN_METADATA;
+  public static Map<String, Map<String, FieldType>> COLUMN_METADATA = new 
HashMap<>();
   private AccessConfig config;
 
   @Override
@@ -169,7 +172,7 @@ public class InMemoryDao implements IndexDao {
     return false;
   }
 
-  private static Map<String, Object> parse(String doc) {
+  public static Map<String, Object> parse(String doc) {
     try {
       return JSONUtils.INSTANCE.load(doc, new TypeReference<Map<String, 
Object>>() {});
     } catch (IOException e) {
@@ -199,6 +202,24 @@ public class InMemoryDao implements IndexDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
+    List<Document> documents = new ArrayList<>();
+    for(Map.Entry<String, List<String>> kv: BACKING_STORE.entrySet()) {
+      for(String doc : kv.getValue()) {
+        Map<String, Object> docParsed = parse(doc);
+        String guid = (String) docParsed.getOrDefault(Constants.GUID, "");
+        for (GetRequest getRequest: getRequests) {
+          if(getRequest.getGuid().equals(guid)) {
+            documents.add(new Document(doc, guid, getRequest.getSensorType(), 
0L));
+          }
+        }
+
+      }
+    }
+    return documents;
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws 
IOException {
     for (Map.Entry<String, List<String>> kv : BACKING_STORE.entrySet()) {
       if (kv.getKey().startsWith(update.getSensorType())) {

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
index cb7635e..fad0eda 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/InMemoryMetaAlertDao.java
@@ -18,17 +18,23 @@
 
 package org.apache.metron.indexing.dao;
 
+import static org.apache.metron.common.Constants.GUID;
+
 import com.google.common.collect.ImmutableList;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateRequest;
 import org.apache.metron.indexing.dao.metaalert.MetaAlertCreateResponse;
+import org.apache.metron.indexing.dao.metaalert.MetaAlertStatus;
 import org.apache.metron.indexing.dao.metaalert.MetaScores;
 import org.apache.metron.indexing.dao.search.FieldType;
 import org.apache.metron.indexing.dao.search.GetRequest;
@@ -48,6 +54,8 @@ import org.json.simple.JSONObject;
 
 public class InMemoryMetaAlertDao implements MetaAlertDao {
 
+  public static Map<String, Collection<String>> METAALERT_STORE = new 
HashMap<>();
+
   private IndexDao indexDao;
 
   /**
@@ -83,7 +91,7 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
-  public void init(IndexDao indexDao, String threatSort) {
+  public void init(IndexDao indexDao, Optional<String> threatSort) {
     this.indexDao = indexDao;
     // Ignore threatSort for test.
   }
@@ -94,6 +102,11 @@ public class InMemoryMetaAlertDao implements MetaAlertDao {
   }
 
   @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
+    return indexDao.getAllLatest(getRequests);
+  }
+
+  @Override
   public void update(Document update, Optional<String> index) throws 
IOException {
     indexDao.update(update, index);
   }
@@ -146,15 +159,16 @@ public class InMemoryMetaAlertDao implements MetaAlertDao 
{
   @Override
   public MetaAlertCreateResponse createMetaAlert(MetaAlertCreateRequest 
request)
       throws InvalidCreateException, IOException {
-    if (request.getGuidToIndices().isEmpty()) {
+    List<GetRequest> alertRequests = request.getAlerts();
+    if (alertRequests.isEmpty()) {
       MetaAlertCreateResponse response = new MetaAlertCreateResponse();
       response.setCreated(false);
       return response;
     }
     // Build meta alert json.  Give it a reasonable GUID
     JSONObject metaAlert = new JSONObject();
-    metaAlert.put(Constants.GUID,
-        "meta_" + 
(InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1));
+    String metaAlertGuid = "meta_" + 
(InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).size() + 1);
+    metaAlert.put(GUID, metaAlertGuid);
 
     JSONArray groupsArray = new JSONArray();
     groupsArray.addAll(request.getGroups());
@@ -164,16 +178,17 @@ public class InMemoryMetaAlertDao implements MetaAlertDao 
{
     // For the purpose of testing, we're just using guids for the alerts field 
and grabbing the scores.
     JSONArray alertArray = new JSONArray();
     List<Double> threatScores = new ArrayList<>();
-    for (Map.Entry<String, String> entry : 
request.getGuidToIndices().entrySet()) {
+    Collection<String> alertGuids = new ArrayList<>();
+    for (GetRequest alertRequest : alertRequests) {
       SearchRequest searchRequest = new SearchRequest();
-      searchRequest.setIndices(ImmutableList.of(entry.getValue()));
-      searchRequest.setQuery("guid:" + entry.getKey());
+      
searchRequest.setIndices(ImmutableList.of(alertRequest.getIndex().get()));
+      searchRequest.setQuery("guid:" + alertRequest.getGuid());
       try {
         SearchResponse searchResponse = search(searchRequest);
         List<SearchResult> searchResults = searchResponse.getResults();
         if (searchResults.size() > 1) {
           throw new InvalidCreateException(
-              "Found more than one result for: " + entry.getKey() + ". Values: 
" + searchResults
+              "Found more than one result for: " + alertRequest.getGuid() + ". 
Values: " + searchResults
           );
         }
 
@@ -186,18 +201,79 @@ public class InMemoryMetaAlertDao implements MetaAlertDao 
{
           threatScores.add(threatScore);
         }
       } catch (InvalidSearchException e) {
-        throw new InvalidCreateException("Unable to find guid: " + 
entry.getKey(), e);
+        throw new InvalidCreateException("Unable to find guid: " + 
alertRequest.getGuid(), e);
       }
+      alertGuids.add(alertRequest.getGuid());
     }
 
     metaAlert.put(MetaAlertDao.ALERT_FIELD, alertArray);
     metaAlert.putAll(new MetaScores(threatScores).getMetaScores());
+    metaAlert.put(STATUS_FIELD, MetaAlertStatus.ACTIVE.getStatusString());
 
     // Add the alert to the store, but make sure not to overwrite existing 
results
     
InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX).add(metaAlert.toJSONString());
 
+    METAALERT_STORE.put(metaAlertGuid, new HashSet<>(alertGuids));
+
     MetaAlertCreateResponse createResponse = new MetaAlertCreateResponse();
+    createResponse.setGuid(metaAlertGuid);
     createResponse.setCreated(true);
     return createResponse;
   }
+
+  @Override
+  public boolean addAlertsToMetaAlert(String metaAlertGuid, List<GetRequest> 
alertRequests) throws IOException {
+    Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid);
+    if (currentAlertGuids == null) {
+      return false;
+    }
+    Collection<String> alertGuids = 
alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet());
+    boolean added = currentAlertGuids.addAll(alertGuids);
+    if (added) {
+      METAALERT_STORE.put(metaAlertGuid, currentAlertGuids);
+    }
+    return added;
+  }
+
+  @Override
+  public boolean removeAlertsFromMetaAlert(String metaAlertGuid, 
List<GetRequest> alertRequests) throws IOException {
+    Collection<String> currentAlertGuids = METAALERT_STORE.get(metaAlertGuid);
+    if (currentAlertGuids == null) {
+      return false;
+    }
+    Collection<String> alertGuids = 
alertRequests.stream().map(GetRequest::getGuid).collect(Collectors.toSet());
+    boolean removed = currentAlertGuids.removeAll(alertGuids);
+    if (removed) {
+      METAALERT_STORE.put(metaAlertGuid, currentAlertGuids);
+    }
+    return removed;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public boolean updateMetaAlertStatus(String metaAlertGuid, MetaAlertStatus 
status)
+      throws IOException {
+    boolean statusChanged = false;
+    List<String> metaAlerts = 
InMemoryDao.BACKING_STORE.get(MetaAlertDao.METAALERTS_INDEX);
+    for (String metaAlert: metaAlerts) {
+      JSONObject metaAlertJSON = JSONUtils.INSTANCE.load(metaAlert, 
JSONObject.class);
+      if (metaAlertGuid.equals(metaAlertJSON.get(GUID))) {
+        statusChanged = 
!status.getStatusString().equals(metaAlertJSON.get(STATUS_FIELD));
+        if (statusChanged) {
+          metaAlertJSON.put(STATUS_FIELD, status.getStatusString());
+          metaAlerts.remove(metaAlert);
+          metaAlerts.add(metaAlertJSON.toJSONString());
+          InMemoryDao.BACKING_STORE.put(MetaAlertDao.METAALERTS_INDEX, 
metaAlerts);
+        }
+        break;
+      }
+    }
+    return statusChanged;
+  }
+
+  public static void clear() {
+    InMemoryDao.clear();
+    METAALERT_STORE.clear();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index 2961d96..d991d50 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -17,6 +17,8 @@
  */
 package org.apache.metron.indexing.dao;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import java.util.Iterator;
 import java.util.Optional;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.utils.JSONUtils;
@@ -29,8 +31,8 @@ import org.apache.metron.indexing.dao.search.SearchRequest;
 import org.apache.metron.indexing.dao.search.SearchResponse;
 import org.apache.metron.indexing.dao.search.SearchResult;
 import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.integration.InMemoryComponent;
-import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,6 +105,21 @@ public abstract class SearchIntegrationTest {
   public static String findOneGuidQuery;
 
   /**
+   * [
+   * {
+   * "guid": "bro-1",
+   * "sensorType": "bro"
+   * },
+   * {
+   * "guid": "bro-2",
+   * "sensorType": "bro"
+   * }
+   * ]
+   */
+  @Multiline
+  public static String getAllLatestQuery;
+
+  /**
    * {
    * "indices": ["bro", "snort"],
    * "query": "ip_src_addr:192.168.1.1",
@@ -390,6 +407,19 @@ public abstract class SearchIntegrationTest {
       Assert.assertEquals("bro", doc.get("source:type"));
       Assert.assertEquals(3, doc.get("timestamp"));
     }
+    //Get All Latest Guid Testcase
+    {
+      List<GetRequest> request = JSONUtils.INSTANCE.load(getAllLatestQuery, 
new TypeReference<List<GetRequest>>() {
+      });
+      Iterator<Document> response = dao.getAllLatest(request).iterator();
+      Document bro2 = response.next();
+      Assert.assertEquals("bro_1", bro2.getDocument().get("guid"));
+      Assert.assertEquals("bro", bro2.getDocument().get("source:type"));
+      Document snort2 = response.next();
+      Assert.assertEquals("bro_2", snort2.getDocument().get("guid"));
+      Assert.assertEquals("bro", snort2.getDocument().get("source:type"));
+      Assert.assertFalse(response.hasNext());
+    }
     //Filter test case
     {
       SearchRequest request = JSONUtils.INSTANCE.load(filterQuery, 
SearchRequest.class);

http://git-wip-us.apache.org/repos/asf/metron/blob/fd896fbe/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
new file mode 100644
index 0000000..aa32aa0
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/integration/HBaseDaoIntegrationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.integration;
+
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_CF;
+import static org.apache.metron.indexing.dao.HBaseDao.HBASE_TABLE;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.HBaseDao;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.update.Document;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class HBaseDaoIntegrationTest {
+
+  private static final String TABLE_NAME = "metron_update";
+  private static final String COLUMN_FAMILY = "cf";
+  private static final String SENSOR_TYPE = "test";
+
+  private static IndexDao hbaseDao;
+  private static byte[] expectedKeySerialization = new byte[] {
+          (byte)0xf5,0x53,0x76,(byte)0x96,0x67,0x3a,
+          (byte)0xc1,(byte)0xaf,(byte)0xff,0x41,0x33,(byte)0x9d,
+          (byte)0xac,(byte)0xb9,0x1a,(byte)0xb0,0x00,0x04,
+          0x67,0x75,0x69,0x64,0x00,0x0a,
+          0x73,0x65,0x6e,0x73,0x6f,0x72,
+          0x54,0x79,0x70,0x65
+  };
+
+  @BeforeClass
+  public static void startHBase() throws Exception {
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setMaxSearchResults(1000);
+    accessConfig.setMaxSearchGroups(1000);
+    accessConfig.setGlobalConfigSupplier(() -> new HashMap<String, Object>() {{
+      put(HBASE_TABLE, TABLE_NAME);
+      put(HBASE_CF, COLUMN_FAMILY);
+    }});
+    MockHBaseTableProvider.addToCache(TABLE_NAME, COLUMN_FAMILY);
+    accessConfig.setTableProvider(new MockHBaseTableProvider());
+
+    hbaseDao = new HBaseDao();
+    hbaseDao.init(accessConfig);
+  }
+
+  @After
+  public void clearTable() throws Exception {
+    MockHBaseTableProvider.clear();
+  }
+
+  /**
+   * IF this test fails then you have broken the key serialization in that 
your change has
+   * caused a key to change serialization, so keys from previous releases will 
not be able to be found
+   * under your scheme.  Please either provide a migration plan or undo this 
change.  DO NOT CHANGE THIS
+   * TEST BLITHELY!
+   * @throws Exception
+   */
+  @Test
+  public void testKeySerializationRemainsConstant() throws IOException {
+    HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType");
+    byte[] raw = k.toBytes();
+    Assert.assertArrayEquals(raw, expectedKeySerialization);
+  }
+
+
+  @Test
+  public void testKeySerialization() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key("guid", "sensorType");
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testKeySerializationWithInvalidGuid() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key(null, "sensorType");
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testKeySerializationWithInvalidSensorType() throws Exception {
+    HBaseDao.Key k = new HBaseDao.Key("guid", null);
+    Assert.assertEquals(k, HBaseDao.Key.fromBytes(HBaseDao.Key.toBytes(k)));
+  }
+
+  @Test
+  public void shouldGetLatest() throws Exception {
+    // Load alerts
+    List<Document> alerts = buildAlerts(3);
+    Map<Document, Optional<String>> updates = alerts.stream()
+        .collect(Collectors.toMap(document -> document, document -> 
Optional.empty()));
+    hbaseDao.batchUpdate(updates);
+
+    Document actualDocument = hbaseDao.getLatest("message_1", SENSOR_TYPE);
+    Document expectedDocument = alerts.get(1);
+    Assert.assertEquals(expectedDocument, actualDocument);
+  }
+
+  @Test
+  public void shouldGetAllLatest() throws Exception {
+    // Load alerts
+    List<Document> alerts = buildAlerts(15);
+    alerts.stream().collect(Collectors.toMap(Document::getGuid, document -> 
Optional.empty()));
+    Map<Document, Optional<String>> updates = alerts.stream()
+        .collect(Collectors.toMap(document -> document, document -> 
Optional.empty()));
+    hbaseDao.batchUpdate(updates);
+
+    int expectedCount = 12;
+    List<GetRequest> getRequests = new ArrayList<>();
+    for(int i = 1; i < expectedCount + 1; i ++) {
+      getRequests.add(new GetRequest("message_" + i, SENSOR_TYPE));
+    }
+    Iterator<Document> results = hbaseDao.getAllLatest(getRequests).iterator();
+
+    for (int i = 0; i < expectedCount; i++) {
+      Document expectedDocument = alerts.get(i + 1);
+      Document actualDocument = results.next();
+      Assert.assertEquals(expectedDocument, actualDocument);
+    }
+
+    Assert.assertFalse("Result size should be 12 but was greater", 
results.hasNext());
+  }
+
+  protected List<Document> buildAlerts(int count) throws IOException {
+    List<Document> alerts = new ArrayList<>();
+    for (int i = 0; i < count; ++i) {
+      String guid = "message_" + i;
+      String json = "{\"guid\":\"message_" + i + "\", 
\"source:type\":\"test\"}";
+      Document alert = new Document(json, guid, SENSOR_TYPE, 
System.currentTimeMillis());
+      alerts.add(alert);
+    }
+    return alerts;
+  }
+
+}

Reply via email to