http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
index 325d42e..dd29af3 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -19,203 +19,80 @@ package org.apache.metron.elasticsearch.integration;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.collect.Iterables;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.metron.common.Constants;
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
 import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
-import org.apache.metron.hbase.mock.MockHTable;
-import org.apache.metron.hbase.mock.MockHBaseTableProvider;
-import org.apache.metron.indexing.dao.*;
-import org.apache.metron.indexing.dao.update.Document;
-import org.apache.metron.indexing.dao.update.ReplaceRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.index.query.QueryBuilders;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.text.SimpleDateFormat;
-import java.util.*;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.UpdateIntegrationTest;
+import org.apache.metron.integration.InMemoryComponent;
 
+public class ElasticsearchUpdateIntegrationTest extends UpdateIntegrationTest {
 
-public class ElasticsearchUpdateIntegrationTest {
-  private static final int MAX_RETRIES = 10;
-  private static final int SLEEP_MS = 500;
   private static final String SENSOR_NAME= "test";
-  private static final String TABLE_NAME = "modifications";
-  private static final String CF = "p";
   private static String indexDir = "target/elasticsearch_mutation";
   private static String dateFormat = "yyyy.MM.dd.HH";
   private static String index = SENSOR_NAME + "_index_" + new 
SimpleDateFormat(dateFormat).format(new Date());
-  private static MockHTable table;
-  private static IndexDao esDao;
-  private static IndexDao hbaseDao;
-  private static MultiIndexDao dao;
   private static ElasticSearchComponent es;
 
-  @BeforeClass
-  public static void setup() throws Exception {
-    Configuration config = HBaseConfiguration.create();
-    MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
-    tableProvider.addToCache(TABLE_NAME, CF);
-    table = (MockHTable)tableProvider.getTable(config, TABLE_NAME);
-    // setup the client
-    es = new ElasticSearchComponent.Builder()
-            .withHttpPort(9211)
-            .withIndexDir(new File(indexDir))
-            .build();
-    es.start();
+  @Override
+  protected String getIndexName() {
+    return SENSOR_NAME + "_index_" + new 
SimpleDateFormat(dateFormat).format(new Date());
+  }
 
-    hbaseDao = new HBaseDao();
-    AccessConfig accessConfig = new AccessConfig();
-    accessConfig.setTableProvider(tableProvider);
-    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+  @Override
+  protected Map<String, Object> createGlobalConfig() throws Exception {
+    return new HashMap<String, Object>() {{
       put("es.clustername", "metron");
       put("es.port", "9300");
       put("es.ip", "localhost");
       put("es.date.format", dateFormat);
-      put(HBaseDao.HBASE_TABLE, TABLE_NAME);
-      put(HBaseDao.HBASE_CF, CF);
     }};
-    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
-
-    esDao = new ElasticsearchDao();
-
-    dao = new MultiIndexDao(hbaseDao, esDao);
-    dao.init(accessConfig);
+  }
 
+  @Override
+  protected IndexDao createDao() throws Exception {
+    return new ElasticsearchDao();
   }
 
-  @AfterClass
-  public static void teardown() {
-    if(es != null) {
-      es.stop();
-    }
+  @Override
+  protected InMemoryComponent startIndex() throws Exception {
+    es = new ElasticSearchComponent.Builder()
+        .withHttpPort(9211)
+        .withIndexDir(new File(indexDir))
+        .build();
+    es.start();
+    return es;
   }
 
+  @Override
+  protected void loadTestData() throws Exception {
 
+  }
 
-  @Test
-  public void test() throws Exception {
-    List<Map<String, Object>> inputData = new ArrayList<>();
-    for(int i = 0; i < 10;++i) {
-      final String name = "message" + i;
-      inputData.add(
-              new HashMap<String, Object>() {{
-                put("source:type", SENSOR_NAME);
-                put("name" , name);
-                put("timestamp", System.currentTimeMillis());
-                put(Constants.GUID, name);
-              }}
-                             );
-    }
+  @Override
+  protected void addTestData(String indexName, String sensorType,
+      List<Map<String, Object>> docs) throws Exception {
     es.add(index, SENSOR_NAME
-          , Iterables.transform(inputData,
-                    m -> {
-                      try {
-                        return JSONUtils.INSTANCE.toJSON(m, true);
-                      } catch (JsonProcessingException e) {
-                        throw new IllegalStateException(e.getMessage(), e);
-                      }
-                    }
-                    )
+        , Iterables.transform(docs,
+            m -> {
+              try {
+                return JSONUtils.INSTANCE.toJSON(m, true);
+              } catch (JsonProcessingException e) {
+                throw new IllegalStateException(e.getMessage(), e);
+              }
+            }
+        )
     );
-    List<Map<String,Object>> docs = null;
-    for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
-      docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
-      if(docs.size() >= 10) {
-        break;
-      }
-    }
-    Assert.assertEquals(10, docs.size());
-    //modify the first message and add a new field
-    {
-      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
-        put("new-field", "metron");
-      }};
-      String guid = "" + message0.get(Constants.GUID);
-      dao.replace(new ReplaceRequest(){{
-        setReplacement(message0);
-        setGuid(guid);
-        setSensorType(SENSOR_NAME);
-      }}, Optional.empty());
-
-      Assert.assertEquals(1, table.size());
-      Document doc = dao.getLatest(guid, SENSOR_NAME);
-      Assert.assertEquals(message0, doc.getDocument());
-      {
-        //ensure hbase is up to date
-        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, 
SENSOR_NAME)));
-        Result r = table.get(g);
-        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
-        Assert.assertEquals(1, columns.size());
-        Assert.assertEquals(message0
-                , JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
-                        , JSONUtils.MAP_SUPPLIER)
-        );
-      }
-      {
-        //ensure ES is up-to-date
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, 
Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
-          cnt = docs
-                  .stream()
-                  .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
-                  .count();
-        }
-        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
-      }
-    }
-    //modify the same message and modify the new field
-    {
-      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
-        put("new-field", "metron2");
-      }};
-      String guid = "" + message0.get(Constants.GUID);
-      dao.replace(new ReplaceRequest(){{
-        setReplacement(message0);
-        setGuid(guid);
-        setSensorType(SENSOR_NAME);
-      }}, Optional.empty());
-      Assert.assertEquals(1, table.size());
-      Document doc = dao.getLatest(guid, SENSOR_NAME);
-      Assert.assertEquals(message0, doc.getDocument());
-      {
-        //ensure hbase is up to date
-        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, 
SENSOR_NAME)));
-        Result r = table.get(g);
-        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
-        Assert.assertEquals(2, columns.size());
-        Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
-                        , JSONUtils.MAP_SUPPLIER)
-        );
-        Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.firstEntry().getValue())
-                        , JSONUtils.MAP_SUPPLIER)
-        );
-      }
-      {
-        //ensure ES is up-to-date
-        long cnt = 0;
-        for (int t = 0; t < MAX_RETRIES && cnt == 0; 
++t,Thread.sleep(SLEEP_MS)) {
-          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
-          cnt = docs
-                  .stream()
-                  .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
-                  .count();
-        }
-
-        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
-      }
-    }
   }
 
-
-
+  @Override
+  protected List<Map<String, Object>> getIndexedTestData(String indexName, 
String sensorType) throws Exception {
+    return es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java
new file mode 100644
index 0000000..3610574
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/ColumnMetadataDao.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.indexing.dao;
+
+import org.apache.metron.indexing.dao.search.FieldType;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Responsible for retrieving column-level metadata about search indices.
+ */
+public interface ColumnMetadataDao {
+
+  /**
+   * Retrieves column metadata for one or more search indices.
+   * @param indices The search indices to retrieve column metadata for.
+   * @return The column metadata, one set for each search index.
+   * @throws IOException
+   */
+  Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java
new file mode 100644
index 0000000..eee91ae
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/search/SearchDao.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.search;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.metron.indexing.dao.update.Document;
+
+public interface SearchDao {
+
+  SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException;
+
+  GroupResponse group(GroupRequest groupRequest) throws InvalidSearchException;
+
+  Document getLatest(String guid, String sensorType) throws IOException;
+
+  Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
new file mode 100644
index 0000000..ca21b62
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/main/java/org/apache/metron/indexing/dao/update/UpdateDao.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.indexing.dao.update;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public interface UpdateDao {
+
+  void update(Document update, Optional<String> index) throws IOException;
+
+  void batchUpdate(Map<Document, Optional<String>> updates) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
index b40db46..b0d0b97 100644
--- 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/SearchIntegrationTest.java
@@ -19,7 +19,6 @@
 package org.apache.metron.indexing.dao;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -60,8 +59,8 @@ public abstract class SearchIntegrationTest {
 
   /**
    * [
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, 
"is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, 
"duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":"10"},
-   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, 
"duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":"20"},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.6", "ip_src_port": 
8005, "long_field": 10000, "timestamp":6, "latitude": 48.5839, "score": 50.0, 
"is_alert":false, "location_point": "50.0,7.7455", "snort_field": 10, 
"duplicate_name_field": 1, "guid":"snort_1", "threat:triage:score":10.0},
+   * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8004, "long_field": 10000, "timestamp":7, "latitude": 48.5839, "score": 10.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 20, 
"duplicate_name_field": 2, "guid":"snort_2", "threat:triage:score":20.0},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.7", "ip_src_port": 
8003, "long_field": 10000, "timestamp":8, "latitude": 48.5839, "score": 20.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 30, 
"duplicate_name_field": 3, "guid":"snort_3"},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.1", "ip_src_port": 
8002, "long_field": 20000, "timestamp":9, "latitude": 48.0001, "score": 50.0, 
"is_alert":true, "location_point": "48.5839,7.7455", "snort_field": 40, 
"duplicate_name_field": 4, "guid":"snort_4"},
    * {"source:type": "snort", "ip_src_addr":"192.168.1.8", "ip_src_port": 
8001, "long_field": 10000, "timestamp":10, "latitude": 48.5839, "score": 10.0, 
"is_alert":false, "location_point": "48.5839,7.7455", "snort_field": 50, 
"duplicate_name_field": 5, "guid":"snort_5"}
@@ -274,6 +273,42 @@ public abstract class SearchIntegrationTest {
 
   /**
    * {
+   * "facetFields": ["snort_field"],
+   * "indices": ["bro", "snort"],
+   * "query": "*:*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String missingTypeFacetQuery;
+
+  /**
+   * {
+   * "facetFields": ["duplicate_name_field"],
+   * "indices": ["bro", "snort"],
+   * "query": "*:*",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String differentTypeFacetQuery;
+
+  /**
+   * {
    * "indices": ["bro", "snort"],
    * "query": "*",
    * "from": 0,
@@ -419,6 +454,23 @@ public abstract class SearchIntegrationTest {
   @Multiline
   public static String groupByIpQuery;
 
+  /**
+   * {
+   * "indices": ["bro", "snort"],
+   * "query": "duplicate_name_field:\"data 1\"",
+   * "from": 0,
+   * "size": 10,
+   * "sort": [
+   *   {
+   *     "field": "timestamp",
+   *     "sortOrder": "desc"
+   *   }
+   * ]
+   * }
+   */
+  @Multiline
+  public static String differentTypeFilterQuery;
+
   protected static IndexDao dao;
   protected static InMemoryComponent indexComponent;
 
@@ -443,11 +495,11 @@ public abstract class SearchIntegrationTest {
     Assert.assertEquals(10, results.size());
     for(int i = 0;i < 5;++i) {
       Assert.assertEquals("snort", 
results.get(i).getSource().get("source:type"));
-      Assert.assertEquals(10 - i, results.get(i).getSource().get("timestamp"));
+      Assert.assertEquals(10 - i + "", 
results.get(i).getSource().get("timestamp").toString());
     }
     for (int i = 5; i < 10; ++i) {
       Assert.assertEquals("bro", 
results.get(i).getSource().get("source:type"));
-      Assert.assertEquals(10 - i, results.get(i).getSource().get("timestamp"));
+      Assert.assertEquals(10 - i + "", 
results.get(i).getSource().get("timestamp").toString());
     }
   }
 
@@ -458,7 +510,7 @@ public abstract class SearchIntegrationTest {
     Assert.assertTrue(response.isPresent());
     Map<String, Object> doc = response.get();
     Assert.assertEquals("bro", doc.get("source:type"));
-    Assert.assertEquals(3, doc.get("timestamp"));
+    Assert.assertEquals("3", doc.get("timestamp").toString());
   }
 
   @Test
@@ -483,11 +535,11 @@ public abstract class SearchIntegrationTest {
     Assert.assertEquals(3, response.getTotal());
     List<SearchResult> results = response.getResults();
     Assert.assertEquals("snort", 
results.get(0).getSource().get("source:type"));
-    Assert.assertEquals(9, results.get(0).getSource().get("timestamp"));
+    Assert.assertEquals("9", 
results.get(0).getSource().get("timestamp").toString());
     Assert.assertEquals("snort", 
results.get(1).getSource().get("source:type"));
-    Assert.assertEquals(7, results.get(1).getSource().get("timestamp"));
+    Assert.assertEquals("7", 
results.get(1).getSource().get("timestamp").toString());
     Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
-    Assert.assertEquals(1, results.get(2).getSource().get("timestamp"));
+    Assert.assertEquals("1", 
results.get(2).getSource().get("timestamp").toString());
   }
 
   @Test
@@ -515,8 +567,8 @@ public abstract class SearchIntegrationTest {
     }
 
     // validate sorted order - there are only 2 with a 'threat:triage:score'
-    Assert.assertEquals("10", 
results.get(8).getSource().get("threat:triage:score"));
-    Assert.assertEquals("20", 
results.get(9).getSource().get("threat:triage:score"));
+    Assert.assertEquals("10.0", 
results.get(8).getSource().get("threat:triage:score").toString());
+    Assert.assertEquals("20.0", 
results.get(9).getSource().get("threat:triage:score").toString());
   }
 
   @Test
@@ -528,8 +580,8 @@ public abstract class SearchIntegrationTest {
     Assert.assertEquals(10, results.size());
 
     // validate sorted order - there are only 2 with a 'threat:triage:score'
-    Assert.assertEquals("20", 
results.get(0).getSource().get("threat:triage:score"));
-    Assert.assertEquals("10", 
results.get(1).getSource().get("threat:triage:score"));
+    Assert.assertEquals("20.0", 
results.get(0).getSource().get("threat:triage:score").toString());
+    Assert.assertEquals("10.0", 
results.get(1).getSource().get("threat:triage:score").toString());
 
     // the remaining are missing the 'threat:triage:score' and should be 
sorted last
     for (int i = 2; i < 10; i++) {
@@ -545,11 +597,11 @@ public abstract class SearchIntegrationTest {
     List<SearchResult> results = response.getResults();
     Assert.assertEquals(3, results.size());
     Assert.assertEquals("snort", 
results.get(0).getSource().get("source:type"));
-    Assert.assertEquals(6, results.get(0).getSource().get("timestamp"));
+    Assert.assertEquals("6", 
results.get(0).getSource().get("timestamp").toString());
     Assert.assertEquals("bro", results.get(1).getSource().get("source:type"));
-    Assert.assertEquals(5, results.get(1).getSource().get("timestamp"));
+    Assert.assertEquals("5", 
results.get(1).getSource().get("timestamp").toString());
     Assert.assertEquals("bro", results.get(2).getSource().get("source:type"));
-    Assert.assertEquals(4, results.get(2).getSource().get("timestamp"));
+    Assert.assertEquals("4", 
results.get(2).getSource().get("timestamp").toString());
   }
 
   @Test
@@ -560,7 +612,7 @@ public abstract class SearchIntegrationTest {
     List<SearchResult> results = response.getResults();
     for (int i = 5, j = 0; i > 0; i--, j++) {
       Assert.assertEquals("bro", 
results.get(j).getSource().get("source:type"));
-      Assert.assertEquals(i, results.get(j).getSource().get("timestamp"));
+      Assert.assertEquals(i + "", 
results.get(j).getSource().get("timestamp").toString());
     }
   }
 
@@ -640,14 +692,6 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void bad_facet_query_throws_exception() throws Exception {
-    thrown.expect(InvalidSearchException.class);
-    thrown.expectMessage("Failed to execute search");
-    SearchRequest request = JSONUtils.INSTANCE.load(badFacetQuery, 
SearchRequest.class);
-    dao.search(request);
-  }
-
-  @Test
   public void disabled_facet_query_returns_null_count() throws Exception {
     SearchRequest request = JSONUtils.INSTANCE.load(disabledFacetQuery, 
SearchRequest.class);
     SearchResponse response = dao.search(request);
@@ -655,7 +699,33 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void exceeding_max_resulsts_throws_exception() throws Exception {
+  public void missing_type_facet_query() throws Exception {
+    SearchRequest request = JSONUtils.INSTANCE.load(missingTypeFacetQuery, 
SearchRequest.class);
+    SearchResponse response = dao.search(request);
+    Assert.assertEquals(10, response.getTotal());
+
+    Map<String, Map<String, Long>> facetCounts = response.getFacetCounts();
+    Assert.assertEquals(1, facetCounts.size());
+    Map<String, Long> snortFieldCounts = facetCounts.get("snort_field");
+    Assert.assertEquals(5, snortFieldCounts.size());
+    Assert.assertEquals(1L, snortFieldCounts.get("50").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("40").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("30").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("20").longValue());
+    Assert.assertEquals(1L, snortFieldCounts.get("10").longValue());
+    response.getFacetCounts();
+  }
+
+  @Test
+  public void different_type_facet_query() throws Exception {
+    thrown.expect(Exception.class);
+    SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFacetQuery, 
SearchRequest.class);
+    SearchResponse response = dao.search(request);
+    Assert.assertEquals(3, response.getTotal());
+  }
+
+  @Test
+  public void exceeding_max_results_throws_exception() throws Exception {
     thrown.expect(InvalidSearchException.class);
     thrown.expectMessage("Search result size must be less than 100");
     SearchRequest request = JSONUtils.INSTANCE.load(exceededMaxResultsQuery, 
SearchRequest.class);
@@ -663,68 +733,7 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void returns_column_data_for_multiple_indices() throws Exception {
-    Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
-    Assert.assertEquals(15, fieldTypes.size());
-    Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
-    Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
-    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
-    Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
-    Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
-    Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-    Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
-    Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
-    //NOTE: This is because the field is in both bro and snort and they have 
different types.
-    Assert.assertEquals(FieldType.OTHER, 
fieldTypes.get("duplicate_name_field"));
-    Assert.assertEquals(FieldType.FLOAT, 
fieldTypes.get("threat:triage:score"));
-    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
-  }
-
-  @Test
-  public void returns_column_metadata_for_specified_indices() throws Exception 
{
-    // getColumnMetadata with only bro
-    {
-      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
-      Assert.assertEquals(13, fieldTypes.size());
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
-      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
-      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
-      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
-    }
-    // getColumnMetadata with only snort
-    {
-      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
-      Assert.assertEquals(14, fieldTypes.size());
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
-      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
-      Assert.assertEquals(FieldType.KEYWORD, fieldTypes.get("guid"));
-      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
-      Assert.assertEquals(FieldType.IP, fieldTypes.get("ip_src_addr"));
-      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
-      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
-      Assert.assertEquals(FieldType.DATE, fieldTypes.get("timestamp"));
-      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
-      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
-      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
-      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
-      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("alert"));
-    }
+  public void column_metadata_for_missing_index() throws Exception {
     // getColumnMetadata with an index that doesn't exist
     {
       Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("someindex"));
@@ -732,6 +741,12 @@ public abstract class SearchIntegrationTest {
     }
   }
 
+  @Test
+  public void no_results_returned_when_query_does_not_match() throws Exception 
{
+    SearchRequest request = JSONUtils.INSTANCE.load(noResultsFieldsQuery, 
SearchRequest.class);
+    SearchResponse response = dao.search(request);
+    Assert.assertEquals(0, response.getTotal());
+  }
 
   @Test
   public void group_by_ip_query() throws Exception {
@@ -757,13 +772,6 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void no_results_returned_when_query_does_not_match() throws Exception 
{
-    SearchRequest request = JSONUtils.INSTANCE.load(noResultsFieldsQuery, 
SearchRequest.class);
-    SearchResponse response = dao.search(request);
-    Assert.assertEquals(0, response.getTotal());
-  }
-
-  @Test
   public void group_by_returns_results_in_groups() throws Exception {
     // Group by test case, default order is count descending
     GroupRequest request = JSONUtils.INSTANCE.load(groupByQuery, 
GroupRequest.class);
@@ -896,15 +904,6 @@ public abstract class SearchIntegrationTest {
   }
 
   @Test
-  public void 
throws_exception_on_aggregation_queries_on_non_string_non_numeric_fields()
-          throws Exception {
-    thrown.expect(InvalidSearchException.class);
-    thrown.expectMessage("Failed to execute search");
-    GroupRequest request = JSONUtils.INSTANCE.load(badGroupQuery, 
GroupRequest.class);
-    dao.group(request);
-  }
-
-  @Test
   public void queries_fields() throws Exception {
     SearchRequest request = JSONUtils.INSTANCE.load(fieldsQuery, 
SearchRequest.class);
     SearchResponse response = dao.search(request);
@@ -940,6 +939,14 @@ public abstract class SearchIntegrationTest {
     indexComponent.stop();
   }
 
+  @Test
+  public abstract void returns_column_data_for_multiple_indices() throws 
Exception;
+  @Test
+  public abstract void returns_column_metadata_for_specified_indices() throws 
Exception;
+  @Test
+  public abstract void different_type_filter_query() throws Exception;
+
+
   protected abstract IndexDao createDao() throws Exception;
   protected abstract InMemoryComponent startIndex() throws Exception;
   protected abstract void loadTestData() throws Exception;

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
new file mode 100644
index 0000000..369fa79
--- /dev/null
+++ 
b/metron-platform/metron-indexing/src/test/java/org/apache/metron/indexing/dao/UpdateIntegrationTest.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership.  The ASF licenses this file to you under the Apache 
License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with the 
License.  You may obtain
+ * a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.metron.indexing.dao;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.ReplaceRequest;
+import org.apache.metron.integration.InMemoryComponent;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public abstract class UpdateIntegrationTest {
+
+  private static final int MAX_RETRIES = 10;
+  private static final int SLEEP_MS = 500;
+  protected static final String SENSOR_NAME= "test";
+  private static final String TABLE_NAME = "modifications";
+  private static final String CF = "p";
+  private static String index;
+  private static MockHTable table;
+  private static IndexDao hbaseDao;
+
+  protected static MultiIndexDao dao;
+  protected static InMemoryComponent indexComponent;
+
+  @Before
+  public void setup() throws Exception {
+    if(dao == null && indexComponent == null) {
+      index = getIndexName();
+      indexComponent = startIndex();
+      loadTestData();
+      Configuration config = HBaseConfiguration.create();
+      MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
+      tableProvider.addToCache(TABLE_NAME, CF);
+      table = (MockHTable)tableProvider.getTable(config, TABLE_NAME);
+
+      hbaseDao = new HBaseDao();
+      AccessConfig accessConfig = new AccessConfig();
+      accessConfig.setTableProvider(tableProvider);
+      Map<String, Object> globalConfig = createGlobalConfig();
+      globalConfig.put(HBaseDao.HBASE_TABLE, TABLE_NAME);
+      globalConfig.put(HBaseDao.HBASE_CF, CF);
+      accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+
+      dao = new MultiIndexDao(hbaseDao, createDao());
+      dao.init(accessConfig);
+    }
+  }
+
+  @Test
+  public void test() throws Exception {
+    List<Map<String, Object>> inputData = new ArrayList<>();
+    for(int i = 0; i < 10;++i) {
+      final String name = "message" + i;
+      inputData.add(
+          new HashMap<String, Object>() {{
+            put("source:type", SENSOR_NAME);
+            put("name" , name);
+            put("timestamp", System.currentTimeMillis());
+            put(Constants.GUID, name);
+          }}
+      );
+    }
+    addTestData(index, SENSOR_NAME, inputData);
+    List<Map<String,Object>> docs = null;
+    for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
+      docs = getIndexedTestData(index, SENSOR_NAME);
+      if(docs.size() >= 10) {
+        break;
+      }
+    }
+    Assert.assertEquals(10, docs.size());
+    //modify the first message and add a new field
+    {
+      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
+        put("new-field", "metron");
+      }};
+      String guid = "" + message0.get(Constants.GUID);
+      dao.replace(new ReplaceRequest(){{
+        setReplacement(message0);
+        setGuid(guid);
+        setSensorType(SENSOR_NAME);
+        setIndex(index);
+      }}, Optional.empty());
+
+      Assert.assertEquals(1, table.size());
+      Document doc = dao.getLatest(guid, SENSOR_NAME);
+      Assert.assertEquals(message0, doc.getDocument());
+      {
+        //ensure hbase is up to date
+        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, 
SENSOR_NAME)));
+        Result r = table.get(g);
+        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
+        Assert.assertEquals(1, columns.size());
+        Assert.assertEquals(message0
+            , JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
+                , JSONUtils.MAP_SUPPLIER)
+        );
+      }
+      {
+        //ensure ES is up-to-date
+        long cnt = 0;
+        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, 
Thread.sleep(SLEEP_MS)) {
+          docs = getIndexedTestData(index, SENSOR_NAME);
+          cnt = docs
+              .stream()
+              .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
+              .count();
+        }
+        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
+      }
+    }
+    //modify the same message and modify the new field
+    {
+      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
+        put("new-field", "metron2");
+      }};
+      String guid = "" + message0.get(Constants.GUID);
+      dao.replace(new ReplaceRequest(){{
+        setReplacement(message0);
+        setGuid(guid);
+        setSensorType(SENSOR_NAME);
+        setIndex(index);
+      }}, Optional.empty());
+      Assert.assertEquals(1, table.size());
+      Document doc = dao.getLatest(guid, SENSOR_NAME);
+      Assert.assertEquals(message0, doc.getDocument());
+      {
+        //ensure hbase is up to date
+        Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid, 
SENSOR_NAME)));
+        Result r = table.get(g);
+        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
+        Assert.assertEquals(2, columns.size());
+        Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
+            , JSONUtils.MAP_SUPPLIER)
+        );
+        Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.firstEntry().getValue())
+            , JSONUtils.MAP_SUPPLIER)
+        );
+      }
+      {
+        //ensure ES is up-to-date
+        long cnt = 0;
+        for (int t = 0; t < MAX_RETRIES && cnt == 0; 
++t,Thread.sleep(SLEEP_MS)) {
+          docs = getIndexedTestData(index, SENSOR_NAME);
+          cnt = docs
+              .stream()
+              .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
+              .count();
+        }
+
+        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
+      }
+    }
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if(indexComponent != null) {
+      indexComponent.stop();
+    }
+  }
+
+  protected abstract String getIndexName();
+  protected abstract Map<String, Object> createGlobalConfig() throws Exception;
+  protected abstract IndexDao createDao() throws Exception;
+  protected abstract InMemoryComponent startIndex() throws Exception;
+  protected abstract void loadTestData() throws Exception;
+  protected abstract void addTestData(String indexName, String sensorType, 
List<Map<String,Object>> docs) throws Exception;
+  protected abstract List<Map<String,Object>> getIndexedTestData(String 
indexName, String sensorType) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-solr/pom.xml 
b/metron-platform/metron-solr/pom.xml
index 9c9c7fb..9b2e806 100644
--- a/metron-platform/metron-solr/pom.xml
+++ b/metron-platform/metron-solr/pom.xml
@@ -63,6 +63,14 @@
                     <artifactId>fastutil</artifactId>
                     <groupId>it.unimi.dsi</groupId>
                 </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-core</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-databind</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
@@ -192,11 +200,15 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-common</artifactId>
                 </exclusion>
+                <exclusion>
+                  <groupId>org.hamcrest</groupId>
+                  <artifactId>hamcrest-core</artifactId>
+                </exclusion>
             </exclusions>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
             <version>${global_mockito_version}</version>
             <scope>test</scope>
         </dependency>
@@ -231,6 +243,29 @@
                 </exclusion>
             </exclusions>
         </dependency>
+      <dependency>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-hbase</artifactId>
+        <version>${project.parent.version}</version>
+        <scope>test</scope>
+        <type>test-jar</type>
+        <exclusions>
+          <exclusion>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+          </exclusion>
+          <exclusion>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+          </exclusion>
+        </exclusions>
+      </dependency>
+      <dependency>
+        <groupId>org.hamcrest</groupId>
+        <artifactId>hamcrest-core</artifactId>
+        <version>1.3</version>
+        <scope>test</scope>
+      </dependency>
 
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java
new file mode 100644
index 0000000..f645e93
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrColumnMetadataDao.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.dao;
+
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.schema.SchemaRequest;
+import org.apache.solr.client.solrj.response.schema.SchemaRepresentation;
+import org.apache.solr.common.SolrException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrColumnMetadataDao implements ColumnMetadataDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private static Map<String, FieldType> solrTypeMap;
+
+  static {
+    Map<String, FieldType> fieldTypeMap = new HashMap<>();
+    fieldTypeMap.put("string", FieldType.TEXT);
+    fieldTypeMap.put("pint", FieldType.INTEGER);
+    fieldTypeMap.put("plong", FieldType.LONG);
+    fieldTypeMap.put("pfloat", FieldType.FLOAT);
+    fieldTypeMap.put("pdouble", FieldType.DOUBLE);
+    fieldTypeMap.put("boolean", FieldType.BOOLEAN);
+    solrTypeMap = Collections.unmodifiableMap(fieldTypeMap);
+  }
+
+  private String zkHost;
+
+  public SolrColumnMetadataDao(String zkHost) {
+    this.zkHost = zkHost;
+  }
+
+  @Override
+  public Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException {
+    Map<String, FieldType> indexColumnMetadata = new HashMap<>();
+    Map<String, String> previousIndices = new HashMap<>();
+    Set<String> fieldBlackList = Sets.newHashSet(SolrDao.ROOT_FIELD, 
SolrDao.VERSION_FIELD);
+
+    for (String index : indices) {
+      CloudSolrClient client = new 
CloudSolrClient.Builder().withZkHost(zkHost).build();
+      client.setDefaultCollection(index);
+      try {
+        SchemaRepresentation schemaRepresentation = new 
SchemaRequest().process(client)
+            .getSchemaRepresentation();
+        schemaRepresentation.getFields().stream().forEach(field -> {
+          String name = (String) field.get("name");
+          if (!fieldBlackList.contains(name)) {
+            FieldType type = toFieldType((String) field.get("type"));
+            if (!indexColumnMetadata.containsKey(name)) {
+              indexColumnMetadata.put(name, type);
+
+              // record the last index in which a field exists, to be able to 
print helpful error message on type mismatch
+              previousIndices.put(name, index);
+            } else {
+              FieldType previousType = indexColumnMetadata.get(name);
+              if (!type.equals(previousType)) {
+                String previousIndexName = previousIndices.get(name);
+                LOG.error(String.format(
+                    "Field type mismatch: %s.%s has type %s while %s.%s has 
type %s.  Defaulting type to %s.",
+                    index, field, type.getFieldType(),
+                    previousIndexName, field, previousType.getFieldType(),
+                    FieldType.OTHER.getFieldType()));
+                indexColumnMetadata.put(name, FieldType.OTHER);
+
+                // the field is defined in multiple indices with different 
types; ignore the field as type has been set to OTHER
+                fieldBlackList.add(name);
+              }
+            }
+          }
+        });
+      } catch (SolrServerException e) {
+        throw new IOException(e);
+      } catch (SolrException e) {
+        // 400 means an index is missing so continue
+        if (e.code() != 400) {
+          throw new IOException(e);
+        }
+      }
+    }
+    return indexColumnMetadata;
+  }
+
+  /**
+   * Converts a string type to the corresponding FieldType.
+   *
+   * @param type The type to convert.
+   * @return The corresponding FieldType or FieldType.OTHER, if no match.
+   */
+  private FieldType toFieldType(String type) {
+    return solrTypeMap.getOrDefault(type, FieldType.OTHER);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java
new file mode 100644
index 0000000..024755a
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrDao.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.dao;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.ColumnMetadataDao;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrDao implements IndexDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  public static final String ROOT_FIELD = "_root_";
+  public static final String VERSION_FIELD = "_version_";
+
+  private transient SolrClient client;
+  private SolrSearchDao solrSearchDao;
+  private SolrUpdateDao solrUpdateDao;
+  private ColumnMetadataDao solrColumnMetadataDao;
+
+  private AccessConfig accessConfig;
+
+  protected SolrDao(SolrClient client,
+      AccessConfig config,
+      SolrSearchDao solrSearchDao,
+      SolrUpdateDao solrUpdateDao,
+      SolrColumnMetadataDao solrColumnMetadataDao) {
+    this.client = client;
+    this.accessConfig = config;
+    this.solrSearchDao = solrSearchDao;
+    this.solrUpdateDao = solrUpdateDao;
+    this.solrColumnMetadataDao = solrColumnMetadataDao;
+  }
+
+  public SolrDao() {
+    //uninitialized.
+  }
+
+  @Override
+  public void init(AccessConfig config) {
+    if (this.client == null) {
+      Map<String, Object> globalConfig = 
config.getGlobalConfigSupplier().get();
+      String zkHost = (String) globalConfig.get("solr.zookeeper");
+      this.client = new CloudSolrClient.Builder().withZkHost((String) 
globalConfig.get("solr.zookeeper")).build();
+      this.accessConfig = config;
+      this.solrSearchDao = new SolrSearchDao(this.client, this.accessConfig);
+      this.solrUpdateDao = new SolrUpdateDao(this.client);
+      this.solrColumnMetadataDao = new SolrColumnMetadataDao(zkHost);
+    }
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    return this.solrSearchDao.search(searchRequest);
+  }
+
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    return this.solrSearchDao.group(groupRequest);
+  }
+
+  @Override
+  public Document getLatest(String guid, String collection) throws IOException 
{
+    return this.solrSearchDao.getLatest(guid, collection);
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
+    return this.solrSearchDao.getAllLatest(getRequests);
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws 
IOException {
+    this.solrUpdateDao.update(update, index);
+  }
+
+  @Override
+  public void batchUpdate(Map<Document, Optional<String>> updates) throws 
IOException {
+    this.solrUpdateDao.batchUpdate(updates);
+  }
+
+  @Override
+  public Map<String, FieldType> getColumnMetadata(List<String> indices) throws 
IOException {
+    return this.solrColumnMetadataDao.getColumnMetadata(indices);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java
new file mode 100644
index 0000000..031d1d5
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrSearchDao.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.dao;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.search.GetRequest;
+import org.apache.metron.indexing.dao.search.Group;
+import org.apache.metron.indexing.dao.search.GroupOrder;
+import org.apache.metron.indexing.dao.search.GroupOrderType;
+import org.apache.metron.indexing.dao.search.GroupRequest;
+import org.apache.metron.indexing.dao.search.GroupResponse;
+import org.apache.metron.indexing.dao.search.GroupResult;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchDao;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.indexing.dao.search.SearchResult;
+import org.apache.metron.indexing.dao.search.SortField;
+import org.apache.metron.indexing.dao.search.SortOrder;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrQuery;
+import org.apache.solr.client.solrj.SolrQuery.ORDER;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.client.solrj.response.FacetField;
+import org.apache.solr.client.solrj.response.FacetField.Count;
+import org.apache.solr.client.solrj.response.PivotField;
+import org.apache.solr.client.solrj.response.QueryResponse;
+import org.apache.solr.common.SolrDocument;
+import org.apache.solr.common.SolrDocumentList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrSearchDao implements SearchDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private transient SolrClient client;
+  private AccessConfig accessConfig;
+
+  public SolrSearchDao(SolrClient client, AccessConfig accessConfig) {
+    this.client = client;
+    this.accessConfig = accessConfig;
+  }
+
+  @Override
+  public SearchResponse search(SearchRequest searchRequest) throws 
InvalidSearchException {
+    if (searchRequest.getQuery() == null) {
+      throw new InvalidSearchException("Search query is invalid: null");
+    }
+    if (client == null) {
+      throw new InvalidSearchException("Uninitialized Dao!  You must call 
init() prior to use.");
+    }
+    if (searchRequest.getSize() > accessConfig.getMaxSearchResults()) {
+      throw new InvalidSearchException(
+          "Search result size must be less than " + 
accessConfig.getMaxSearchResults());
+    }
+    SolrQuery query = buildSearchRequest(searchRequest);
+    try {
+      QueryResponse response = client.query(query);
+      return buildSearchResponse(searchRequest, response);
+    } catch (IOException | SolrServerException e) {
+      String msg = e.getMessage();
+      LOG.error(msg, e);
+      throw new InvalidSearchException(msg, e);
+    }
+  }
+
+  @Override
+  public GroupResponse group(GroupRequest groupRequest) throws 
InvalidSearchException {
+    String groupNames = 
groupRequest.getGroups().stream().map(Group::getField).collect(
+        Collectors.joining(","));
+    SolrQuery query = new SolrQuery()
+        .setStart(0)
+        .setRows(0)
+        .setQuery(groupRequest.getQuery());
+    query.set("collection", "bro,snort");
+    Optional<String> scoreField = groupRequest.getScoreField();
+    if (scoreField.isPresent()) {
+      query.set("stats", true);
+      query.set("stats.field", String.format("{!tag=piv1 sum=true}%s", 
scoreField.get()));
+    }
+    query.set("facet", true);
+    query.set("facet.pivot", String.format("{!stats=piv1}%s", groupNames));
+    try {
+      QueryResponse response = client.query(query);
+      return buildGroupResponse(groupRequest, response);
+    } catch (IOException | SolrServerException e) {
+      String msg = e.getMessage();
+      LOG.error(msg, e);
+      throw new InvalidSearchException(msg, e);
+    }
+  }
+
+  @Override
+  public Document getLatest(String guid, String collection) throws IOException 
{
+    try {
+      SolrDocument solrDocument = client.getById(collection, guid);
+      return toDocument(solrDocument);
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public Iterable<Document> getAllLatest(List<GetRequest> getRequests) throws 
IOException {
+    Map<String, Collection<String>> collectionIdMap = new HashMap<>();
+    for (GetRequest getRequest: getRequests) {
+      Collection<String> ids = 
collectionIdMap.getOrDefault(getRequest.getSensorType(), new HashSet<>());
+      ids.add(getRequest.getGuid());
+      collectionIdMap.put(getRequest.getSensorType(), ids);
+    }
+    try {
+      List<Document> documents = new ArrayList<>();
+      for (String collection: collectionIdMap.keySet()) {
+        SolrDocumentList solrDocumentList = 
client.getById(collectionIdMap.get(collection),
+            new SolrQuery().set("collection", collection));
+        
documents.addAll(solrDocumentList.stream().map(this::toDocument).collect(Collectors.toList()));
+      }
+      return documents;
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private SolrQuery buildSearchRequest(
+      SearchRequest searchRequest) {
+    SolrQuery query = new SolrQuery()
+        .setStart(searchRequest.getFrom())
+        .setRows(searchRequest.getSize())
+        .setQuery(searchRequest.getQuery());
+
+    // handle sort fields
+    for (SortField sortField : searchRequest.getSort()) {
+      query.addSort(sortField.getField(), 
getSolrSortOrder(sortField.getSortOrder()));
+    }
+
+    // handle search fields
+    Optional<List<String>> fields = searchRequest.getFields();
+    if (fields.isPresent()) {
+      fields.get().forEach(query::addField);
+    }
+
+    //handle facet fields
+    Optional<List<String>> facetFields = searchRequest.getFacetFields();
+    if (facetFields.isPresent()) {
+      facetFields.get().forEach(query::addFacetField);
+    }
+
+    String collections = 
searchRequest.getIndices().stream().collect(Collectors.joining(","));
+    query.set("collection", collections);
+
+    return query;
+  }
+
+  private SolrQuery.ORDER getSolrSortOrder(
+      SortOrder sortOrder) {
+    return sortOrder == SortOrder.DESC ?
+        ORDER.desc : ORDER.asc;
+  }
+
+  private SearchResponse buildSearchResponse(
+      SearchRequest searchRequest,
+      QueryResponse solrResponse) {
+
+    SearchResponse searchResponse = new SearchResponse();
+    SolrDocumentList solrDocumentList = solrResponse.getResults();
+    searchResponse.setTotal(solrDocumentList.getNumFound());
+
+    // search hits --> search results
+    List<SearchResult> results = solrDocumentList.stream()
+        .map(solrDocument -> getSearchResult(solrDocument, 
searchRequest.getFields()))
+        .collect(Collectors.toList());
+    searchResponse.setResults(results);
+
+    // handle facet fields
+    Optional<List<String>> facetFields = searchRequest.getFacetFields();
+    if (facetFields.isPresent()) {
+      searchResponse.setFacetCounts(getFacetCounts(facetFields.get(), 
solrResponse));
+    }
+
+    if (LOG.isDebugEnabled()) {
+      String response;
+      try {
+        response = JSONUtils.INSTANCE.toJSON(searchResponse, false);
+      } catch (JsonProcessingException e) {
+        response = e.getMessage();
+      }
+      LOG.debug("Built search response; response={}", response);
+    }
+    return searchResponse;
+  }
+
+  private SearchResult getSearchResult(SolrDocument solrDocument, 
Optional<List<String>> fields) {
+    SearchResult searchResult = new SearchResult();
+    searchResult.setId((String) solrDocument.getFieldValue(Constants.GUID));
+    Map<String, Object> source;
+    if (fields.isPresent()) {
+      source = new HashMap<>();
+      fields.get().forEach(field -> source.put(field, 
solrDocument.getFieldValue(field)));
+    } else {
+      source = solrDocument.getFieldValueMap();
+    }
+    searchResult.setSource(source);
+    return searchResult;
+  }
+
+  private Map<String, Map<String, Long>> getFacetCounts(List<String> fields,
+      QueryResponse solrResponse) {
+    Map<String, Map<String, Long>> fieldCounts = new HashMap<>();
+    for (String field : fields) {
+      Map<String, Long> valueCounts = new HashMap<>();
+      FacetField facetField = solrResponse.getFacetField(field);
+      for (Count facetCount : facetField.getValues()) {
+        valueCounts.put(facetCount.getName(), facetCount.getCount());
+      }
+      fieldCounts.put(field, valueCounts);
+    }
+    return fieldCounts;
+  }
+
+  /**
+   * Build a group response.
+   * @param groupRequest The original group request.
+   * @param response The search response.
+   * @return A group response.
+   */
+  private GroupResponse buildGroupResponse(
+      GroupRequest groupRequest,
+      QueryResponse response) {
+    String groupNames = 
groupRequest.getGroups().stream().map(Group::getField).collect(
+        Collectors.joining(","));
+    List<PivotField> pivotFields = response.getFacetPivot().get(groupNames);
+    GroupResponse groupResponse = new GroupResponse();
+    groupResponse.setGroupedBy(groupRequest.getGroups().get(0).getField());
+    groupResponse.setGroupResults(getGroupResults(groupRequest, 0, 
pivotFields));
+    return groupResponse;
+  }
+
+  private List<GroupResult> getGroupResults(GroupRequest groupRequest, int 
index, List<PivotField> pivotFields) {
+    List<Group> groups = groupRequest.getGroups();
+    List<GroupResult> searchResultGroups = new ArrayList<>();
+    final GroupOrder groupOrder = groups.get(index).getOrder();
+    pivotFields.sort((o1, o2) -> {
+      String s1 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ?
+          o1.getValue().toString() : Integer.toString(o1.getCount());
+      String s2 = groupOrder.getGroupOrderType() == GroupOrderType.TERM ?
+          o2.getValue().toString() : Integer.toString(o2.getCount());
+      if (groupOrder.getSortOrder() == SortOrder.ASC) {
+        return s1.compareTo(s2);
+      } else {
+        return s2.compareTo(s1);
+      }
+    });
+
+    for(PivotField pivotField: pivotFields) {
+      GroupResult groupResult = new GroupResult();
+      groupResult.setKey(pivotField.getValue().toString());
+      groupResult.setTotal(pivotField.getCount());
+      Optional<String> scoreField = groupRequest.getScoreField();
+      if (scoreField.isPresent()) {
+        groupResult.setScore((Double) 
pivotField.getFieldStatsInfo().get("score").getSum());
+      }
+      if (index < groups.size() - 1) {
+        groupResult.setGroupedBy(groups.get(index + 1).getField());
+        groupResult.setGroupResults(getGroupResults(groupRequest, index + 1, 
pivotField.getPivot()));
+      }
+      searchResultGroups.add(groupResult);
+    }
+    return searchResultGroups;
+  }
+
+  private Document toDocument(SolrDocument solrDocument) {
+    Map<String, Object> document = new HashMap<>();
+    solrDocument.getFieldNames().stream()
+        .filter(name -> !name.equals(SolrDao.VERSION_FIELD))
+        .forEach(name -> document.put(name, solrDocument.getFieldValue(name)));
+    return new Document(document,
+        (String) solrDocument.getFieldValue(Constants.GUID),
+        (String) solrDocument.getFieldValue("source:type"), 0L);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java
 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java
new file mode 100644
index 0000000..f25253d
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/main/java/org/apache/metron/solr/dao/SolrUpdateDao.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.dao;
+
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.UpdateDao;
+import org.apache.solr.client.solrj.SolrClient;
+import org.apache.solr.client.solrj.SolrServerException;
+import org.apache.solr.common.SolrInputDocument;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SolrUpdateDao implements UpdateDao {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  private transient SolrClient client;
+
+  public SolrUpdateDao(SolrClient client) {
+    this.client = client;
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws 
IOException {
+    try {
+      SolrInputDocument solrInputDocument = toSolrInputDocument(update);
+      if (index.isPresent()) {
+        this.client.add(index.get(), solrInputDocument);
+      } else {
+        this.client.add(solrInputDocument);
+      }
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void batchUpdate(Map<Document, Optional<String>> updates) throws 
IOException {
+    // updates with a collection specified
+    Map<String, Collection<SolrInputDocument>> solrCollectionUpdates = new 
HashMap<>();
+
+    // updates with no collection specified
+    Collection<SolrInputDocument> solrUpdates = new ArrayList<>();
+
+    for(Entry<Document, Optional<String>> entry: updates.entrySet()) {
+      SolrInputDocument solrInputDocument = 
toSolrInputDocument(entry.getKey());
+      Optional<String> index = entry.getValue();
+      if (index.isPresent()) {
+        Collection<SolrInputDocument> solrInputDocuments = 
solrCollectionUpdates.get(index.get());
+        if (solrInputDocuments == null) {
+          solrInputDocuments = new ArrayList<>();
+        }
+        solrInputDocuments.add(solrInputDocument);
+        solrCollectionUpdates.put(index.get(), solrInputDocuments);
+      } else {
+        solrUpdates.add(solrInputDocument);
+      }
+    }
+    try {
+      if (!solrCollectionUpdates.isEmpty()) {
+        for(Entry<String, Collection<SolrInputDocument>> entry: 
solrCollectionUpdates.entrySet()) {
+          this.client.add(entry.getKey(), entry.getValue());
+        }
+      } else {
+        this.client.add(solrUpdates);
+      }
+    } catch (SolrServerException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private SolrInputDocument toSolrInputDocument(Document document) {
+    SolrInputDocument solrInputDocument = new SolrInputDocument();
+    document.getDocument().forEach(solrInputDocument::addField);
+    return solrInputDocument;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
index 7c907fd..09e88a4 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrIndexingIntegrationTest.java
@@ -18,6 +18,10 @@
 package org.apache.metron.solr.integration;
 
 import com.google.common.base.Function;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.annotation.Nullable;
 import org.apache.metron.common.configuration.Configurations;
 import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.interfaces.FieldNameConverter;
@@ -33,12 +37,6 @@ import 
org.apache.metron.integration.components.KafkaComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.solr.integration.components.SolrComponent;
 
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
 public class SolrIndexingIntegrationTest extends IndexingIntegrationTest {
 
   private String collection = "metron";

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
new file mode 100644
index 0000000..a9ce650
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrSearchIntegrationTest.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.integration;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.SearchIntegrationTest;
+import org.apache.metron.indexing.dao.search.FieldType;
+import org.apache.metron.indexing.dao.search.InvalidSearchException;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.solr.dao.SolrDao;
+import org.apache.metron.solr.integration.components.SolrComponent;
+import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.json.simple.JSONArray;
+import org.json.simple.parser.JSONParser;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class SolrSearchIntegrationTest extends SearchIntegrationTest {
+
+  private SolrComponent solrComponent;
+
+  @Override
+  protected IndexDao createDao() throws Exception {
+    AccessConfig config = new AccessConfig();
+    config.setMaxSearchResults(100);
+    config.setMaxSearchGroups(100);
+    config.setGlobalConfigSupplier( () ->
+        new HashMap<String, Object>() {{
+          put("solr.zookeeper", solrComponent.getZookeeperUrl());
+        }}
+    );
+
+    IndexDao dao = new SolrDao();
+    dao.init(config);
+    return dao;
+  }
+
+  @Override
+  protected InMemoryComponent startIndex() throws Exception {
+    solrComponent = new SolrComponent.Builder()
+        .addCollection("bro", 
"../metron-solr/src/test/resources/config/bro/conf")
+        .addCollection("snort", 
"../metron-solr/src/test/resources/config/snort/conf")
+        .build();
+    solrComponent.start();
+    return solrComponent;
+  }
+
+  @SuppressWarnings("unchecked")
+  @Override
+  protected void loadTestData() throws Exception {
+    CloudSolrClient solrClient = solrComponent.getSolrClient();
+    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
+    solrComponent.addDocs("bro", broArray);
+    solrClient.setDefaultCollection("bro");
+    solrClient.commit();
+    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
+    solrComponent.addDocs("snort", snortArray);
+    solrClient.setDefaultCollection("snort");
+    solrClient.commit();
+  }
+
+  @Override
+  public void returns_column_metadata_for_specified_indices() throws Exception 
{
+    // getColumnMetadata with only bro
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("bro"));
+      Assert.assertEquals(12, fieldTypes.size());
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+      Assert.assertEquals(FieldType.TEXT, 
fieldTypes.get("duplicate_name_field"));
+    }
+    // getColumnMetadata with only snort
+    {
+      Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Collections.singletonList("snort"));
+      Assert.assertEquals(13, fieldTypes.size());
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+      Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr"));
+      Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+      Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp"));
+      Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+      Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+      Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+      Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+      Assert.assertEquals(FieldType.INTEGER, 
fieldTypes.get("duplicate_name_field"));
+    }
+  }
+
+  @Override
+  public void returns_column_data_for_multiple_indices() throws Exception {
+    Map<String, FieldType> fieldTypes = 
dao.getColumnMetadata(Arrays.asList("bro", "snort"));
+    Assert.assertEquals(14, fieldTypes.size());
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("guid"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("source:type"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("ip_src_addr"));
+    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("ip_src_port"));
+    Assert.assertEquals(FieldType.LONG, fieldTypes.get("long_field"));
+    Assert.assertEquals(FieldType.LONG, fieldTypes.get("timestamp"));
+    Assert.assertEquals(FieldType.FLOAT, fieldTypes.get("latitude"));
+    Assert.assertEquals(FieldType.DOUBLE, fieldTypes.get("score"));
+    Assert.assertEquals(FieldType.BOOLEAN, fieldTypes.get("is_alert"));
+    Assert.assertEquals(FieldType.OTHER, fieldTypes.get("location_point"));
+    Assert.assertEquals(FieldType.TEXT, fieldTypes.get("bro_field"));
+    Assert.assertEquals(FieldType.INTEGER, fieldTypes.get("snort_field"));
+    //NOTE: This is because the field is in both bro and snort and they have 
different types.
+    Assert.assertEquals(FieldType.OTHER, 
fieldTypes.get("duplicate_name_field"));
+    Assert.assertEquals(FieldType.FLOAT, 
fieldTypes.get("threat:triage:score"));
+  }
+
+  @Test
+  public void different_type_filter_query() throws Exception {
+    thrown.expect(InvalidSearchException.class);
+    SearchRequest request = JSONUtils.INSTANCE.load(differentTypeFilterQuery, 
SearchRequest.class);
+    SearchResponse response = dao.search(request);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
new file mode 100644
index 0000000..faa4ec4
--- /dev/null
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.solr.integration;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.UpdateIntegrationTest;
+import org.apache.metron.integration.InMemoryComponent;
+import org.apache.metron.solr.dao.SolrDao;
+import org.apache.metron.solr.integration.components.SolrComponent;
+
+public class SolrUpdateIntegrationTest extends UpdateIntegrationTest {
+
+  SolrComponent solrComponent;
+
+  @Override
+  protected String getIndexName() {
+    return SENSOR_NAME;
+  }
+
+  @Override
+  protected Map<String, Object> createGlobalConfig() throws Exception {
+    return new HashMap<String, Object>() {{
+      put("solr.zookeeper", solrComponent.getZookeeperUrl());
+    }};
+  }
+
+  @Override
+  protected IndexDao createDao() throws Exception {
+    return new SolrDao();
+  }
+
+  @Override
+  protected InMemoryComponent startIndex() throws Exception {
+    solrComponent = new SolrComponent.Builder().addCollection(SENSOR_NAME, 
"../metron-solr/src/test/resources/config/test/conf").build();
+    solrComponent.start();
+    return solrComponent;
+  }
+
+  @Override
+  protected void loadTestData() throws Exception {
+
+  }
+
+  @Override
+  protected void addTestData(String indexName, String sensorType,
+      List<Map<String, Object>> docs) throws Exception {
+    solrComponent.addDocs(indexName, docs);
+  }
+
+  @Override
+  protected List<Map<String, Object>> getIndexedTestData(String indexName, 
String sensorType)
+      throws Exception {
+    return solrComponent.getAllIndexedDocs(indexName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/23113a63/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
index 58976a3..85d14f4 100644
--- 
a/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
+++ 
b/metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/components/SolrComponent.java
@@ -18,6 +18,8 @@
 package org.apache.metron.solr.integration.components;
 
 import com.google.common.base.Function;
+import java.util.Collection;
+import java.util.stream.Collectors;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.solr.writer.MetronSolrClient;
@@ -25,6 +27,7 @@ import org.apache.solr.client.solrj.SolrQuery;
 import org.apache.solr.client.solrj.SolrServerException;
 import org.apache.solr.client.solrj.embedded.JettyConfig;
 import org.apache.solr.client.solrj.impl.CloudSolrClient;
+import org.apache.solr.client.solrj.request.CollectionAdminRequest;
 import org.apache.solr.client.solrj.response.QueryResponse;
 import org.apache.solr.cloud.MiniSolrCloudCluster;
 import org.apache.solr.common.SolrDocument;
@@ -36,6 +39,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.solr.common.SolrInputDocument;
 
 public class SolrComponent implements InMemoryComponent {
 
@@ -89,12 +93,12 @@ public class SolrComponent implements InMemoryComponent {
     try {
       File baseDir = Files.createTempDirectory("solrcomponent").toFile();
       baseDir.deleteOnExit();
-      miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir, new 
File(solrXmlPath), JettyConfig.builder().setPort(port).build());
+      miniSolrCloudCluster = new MiniSolrCloudCluster(1, baseDir.toPath(), 
JettyConfig.builder().setPort(port).build());
       for(String name: collections.keySet()) {
         String configPath = collections.get(name);
-        miniSolrCloudCluster.uploadConfigDir(new File(configPath), name);
+        miniSolrCloudCluster.uploadConfigSet(new File(configPath).toPath(), 
name);
+        CollectionAdminRequest.createCollection(name, 1, 
1).process(miniSolrCloudCluster.getSolrClient());
       }
-      miniSolrCloudCluster.createCollection("metron", 1, 1, "metron", new 
HashMap<String, String>());
       if (postStartCallback != null) postStartCallback.apply(this);
     } catch(Exception e) {
       throw new UnableToStartException(e.getMessage(), e);
@@ -158,4 +162,16 @@ public class SolrComponent implements InMemoryComponent {
     }
     return docs;
   }
+
+  public void addDocs(String collection, List<Map<String, Object>> docs)
+      throws IOException, SolrServerException {
+    CloudSolrClient solr = miniSolrCloudCluster.getSolrClient();
+    solr.setDefaultCollection(collection);
+    Collection<SolrInputDocument> solrInputDocuments = docs.stream().map(doc 
-> {
+      SolrInputDocument solrInputDocument = new SolrInputDocument();
+      doc.forEach(solrInputDocument::addField);
+      return solrInputDocument;
+    }).collect(Collectors.toList());
+    solr.add(collection, solrInputDocuments);
+  }
 }

Reply via email to