http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
index c61efc5..60d009f 100644
--- 
a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
+++ 
b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/JSONUtils.java
@@ -27,6 +27,7 @@ import org.json.simple.parser.JSONParser;
 import org.json.simple.parser.ParseException;
 
 import java.io.*;
+import java.util.Map;
 
 public enum JSONUtils {
   INSTANCE;
@@ -37,6 +38,15 @@ public enum JSONUtils {
   private static ThreadLocal<ObjectMapper> _mapper = 
ThreadLocal.withInitial(() ->
           new 
ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL));
 
+  public <T> T convert(Object original, Class<T> targetClass) {
+    return _mapper.get().convertValue(original, targetClass);
+  }
+
+  public ObjectMapper getMapper() {
+    return _mapper.get();
+  }
+
+
   public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
     return _mapper.get().readValue(is, ref);
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-data-management/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-data-management/pom.xml 
b/metron-platform/metron-data-management/pom.xml
index fbfd71a..90c2c52 100644
--- a/metron-platform/metron-data-management/pom.xml
+++ b/metron-platform/metron-data-management/pom.xml
@@ -115,6 +115,13 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
             <groupId>org.mitre.taxii</groupId>
             <artifactId>taxii</artifactId>
             <version>1.1.0.1</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
 
b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
index 0223514..f94a02a 100644
--- 
a/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
+++ 
b/metron-platform/metron-data-management/src/test/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiIntegrationTest.java
@@ -31,8 +31,9 @@ import 
org.apache.metron.dataloads.extractor.stix.StixExtractor;
 import org.apache.metron.enrichment.converter.EnrichmentConverter;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
-import org.apache.metron.test.mock.MockHTable;
 import org.apache.metron.enrichment.lookup.LookupKV;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.junit.*;
 
 import java.io.IOException;
@@ -49,7 +50,7 @@ public class TaxiiIntegrationTest {
     @AfterClass
     public static void teardown() {
         MockTaxiiService.shutdown();
-        MockHTable.Provider.clear();
+        MockHBaseTableProvider.clear();
     }
 
     /**
@@ -91,7 +92,7 @@ public class TaxiiIntegrationTest {
     @Test
     public void testTaxii() throws Exception {
 
-        final MockHTable.Provider provider = new MockHTable.Provider();
+        final MockHBaseTableProvider provider = new MockHBaseTableProvider();
         final Configuration config = HBaseConfiguration.create();
         TaxiiHandler handler = new 
TaxiiHandler(TaxiiConnectionConfig.load(taxiiConnectionConfig), new 
StixExtractor(), config ) {
             @Override
@@ -115,7 +116,7 @@ public class TaxiiIntegrationTest {
         }
         Assert.assertTrue(maliciousAddresses.contains("94.102.53.142"));
         Assert.assertEquals(numStringsMatch(MockTaxiiService.pollMsg, 
"AddressObj:Address_Value condition=\"Equal\""), maliciousAddresses.size());
-        MockHTable.Provider.clear();
+        MockHBaseTableProvider.clear();
 
         // Ensure that the handler can be run multiple times without 
connection issues.
         handler.run();

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/pom.xml 
b/metron-platform/metron-elasticsearch/pom.xml
index 6cef1e9..40989c6 100644
--- a/metron-platform/metron-elasticsearch/pom.xml
+++ b/metron-platform/metron-elasticsearch/pom.xml
@@ -73,6 +73,23 @@
             </exclusions>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${global_hbase_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 64bf4b6..217da84 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,10 +17,24 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.update.Document;
 import org.apache.metron.indexing.dao.IndexDao;
 import org.apache.metron.indexing.dao.search.*;
+import org.apache.metron.indexing.dao.search.SearchRequest;
+import org.apache.metron.indexing.dao.search.SearchResponse;
+import org.elasticsearch.action.get.GetRequestBuilder;
+import org.elasticsearch.action.get.GetResponse;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.action.search.*;
+import org.elasticsearch.action.update.UpdateRequest;
+import org.elasticsearch.action.update.UpdateResponse;
 import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsRequest;
 import org.elasticsearch.action.search.SearchPhaseExecutionException;
 import org.elasticsearch.client.transport.TransportClient;
@@ -36,6 +50,15 @@ import 
org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
 import org.elasticsearch.search.aggregations.bucket.terms.TermsBuilder;
 import org.elasticsearch.search.builder.SearchSourceBuilder;
 import org.elasticsearch.search.sort.*;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.elasticsearch.index.query.QueryBuilder;
+import org.elasticsearch.index.query.QueryBuilders;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.SearchHits;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,6 +69,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 public class ElasticsearchDao implements IndexDao {
@@ -120,6 +144,7 @@ public class ElasticsearchDao implements IndexDao {
       searchResult.setId(searchHit.getId());
       searchResult.setSource(searchHit.getSource());
       searchResult.setScore(searchHit.getScore());
+      searchResult.setIndex(searchHit.getIndex());
       return searchResult;
     }).collect(Collectors.toList()));
     if (facetFields.isPresent()) {
@@ -135,9 +160,92 @@ public class ElasticsearchDao implements IndexDao {
   }
 
   @Override
-  public void init(Map<String, Object> globalConfig, AccessConfig config) {
-    this.client = ElasticsearchUtils.getClient(globalConfig, 
config.getOptionalSettings());
-    this.accessConfig = config;
+  public synchronized void init(AccessConfig config) {
+    if(this.client == null) {
+      this.client = 
ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), 
config.getOptionalSettings());
+      this.accessConfig = config;
+    }
+  }
+
+  @Override
+  public Document getLatest(final String guid, final String sensorType) throws 
IOException {
+    Optional<Document> ret = searchByGuid(
+            guid
+            , sensorType
+            , hit -> {
+              Long ts = 0L;
+              String doc = hit.getSourceAsString();
+              String sourceType = 
Iterables.getFirst(Splitter.on("_doc").split(hit.getType()), null);
+              try {
+                return Optional.of(new Document(doc, guid, sourceType, ts));
+              } catch (IOException e) {
+                throw new IllegalStateException("Unable to retrieve latest: " 
+ e.getMessage(), e);
+              }
+            }
+            );
+    return ret.orElse(null);
+  }
+
+  /**
+   * Return the search hit based on the UUID and sensor type.
+   * A callback can be specified to transform the hit into a type T.
+   * If more than one hit happens, the first one will be returned.
+   * @throws IOException
+   */
+  <T> Optional<T> searchByGuid(String guid, String sensorType, 
Function<SearchHit, Optional<T>> callback) throws IOException{
+    QueryBuilder query =  QueryBuilders.matchQuery(Constants.GUID, guid);
+    SearchRequestBuilder request = client.prepareSearch()
+                                         .setTypes(sensorType + "_doc")
+                                         .setQuery(query)
+                                         .setSource("message")
+                                         ;
+    MultiSearchResponse response = client.prepareMultiSearch()
+                                         .add(request)
+                                         .get();
+    for(MultiSearchResponse.Item i : response) {
+      org.elasticsearch.action.search.SearchResponse resp = i.getResponse();
+      SearchHits hits = resp.getHits();
+      for(SearchHit hit : hits) {
+        Optional<T> ret = callback.apply(hit);
+        if(ret.isPresent()) {
+          return ret;
+        }
+      }
+    }
+    return Optional.empty();
+
+  }
+
+  @Override
+  public void update(Document update, Optional<String> index) throws 
IOException {
+    String indexPostfix = 
ElasticsearchUtils.getIndexFormat(accessConfig.getGlobalConfigSupplier().get()).format(new
 Date());
+    String sensorType = update.getSensorType();
+    String indexName = ElasticsearchUtils.getIndexName(sensorType, 
indexPostfix, null);
+
+    String type = sensorType + "_doc";
+    Object ts = update.getTimestamp();
+    IndexRequest indexRequest = new IndexRequest(indexName, type, 
update.getGuid())
+            .source(update.getDocument())
+            ;
+    if(ts != null) {
+      indexRequest = indexRequest.timestamp(ts.toString());
+    }
+    String existingIndex = index.orElse(
+            searchByGuid(update.getGuid()
+                        , sensorType
+                        , hit -> Optional.ofNullable(hit.getIndex())
+                        ).orElse(indexName)
+                                       );
+    UpdateRequest updateRequest = new UpdateRequest(existingIndex, type, 
update.getGuid())
+            .doc(update.getDocument())
+            .upsert(indexRequest)
+            ;
+
+    try {
+      client.update(updateRequest).get();
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index d199403..c7c4d90 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -20,6 +20,7 @@ package org.apache.metron.elasticsearch.utils;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import org.apache.metron.common.configuration.writer.WriterConfiguration;
 import org.apache.metron.elasticsearch.writer.ElasticsearchWriter;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.settings.Settings;
@@ -27,14 +28,32 @@ import 
org.elasticsearch.common.transport.InetSocketTransportAddress;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.text.SimpleDateFormat;
+import java.util.*;
 
 public class ElasticsearchUtils {
 
+  private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
+          = ThreadLocal.withInitial(() -> new HashMap<>());
+
+  public static SimpleDateFormat getIndexFormat(WriterConfiguration 
configurations) {
+    return getIndexFormat(configurations.getGlobalConfig());
+  }
+
+  public static SimpleDateFormat getIndexFormat(Map<String, Object> 
globalConfig) {
+    String format = (String) globalConfig.get("es.date.format");
+    return DATE_FORMAT_CACHE.get().computeIfAbsent(format, 
SimpleDateFormat::new);
+  }
+
+  public static String getIndexName(String sensorType, String indexPostfix, 
WriterConfiguration configurations) {
+    String indexName = sensorType;
+    if (configurations != null) {
+      indexName = configurations.getIndex(sensorType);
+    }
+    indexName = indexName + "_index_" + indexPostfix;
+    return indexName;
+  }
+
   public static TransportClient getClient(Map<String, Object> 
globalConfiguration, Map<String, String> optionalSettings) {
     Settings.Builder settingsBuilder = Settings.settingsBuilder();
     settingsBuilder.put("cluster.name", 
globalConfiguration.get("es.clustername"));

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 39cdda9..dd32532 100644
--- 
a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ 
b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -17,6 +17,13 @@
  */
 package org.apache.metron.elasticsearch.writer;
 
+import org.apache.metron.common.Constants;
+import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Tuple;
+import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
 import java.io.Serializable;
 import java.text.SimpleDateFormat;
 import java.util.Date;
@@ -86,6 +93,11 @@ public class ElasticsearchWriter implements 
BulkMessageWriter<JSONObject>, Seria
               sensorType + "_doc");
 
       indexRequestBuilder = 
indexRequestBuilder.setSource(esDoc.toJSONString());
+      String guid = (String)esDoc.get(Constants.GUID);
+      if(guid != null) {
+        indexRequestBuilder.setId(guid);
+      }
+
       Object ts = esDoc.get("timestamp");
       if(ts != null) {
         indexRequestBuilder = indexRequestBuilder.setTimestamp(ts.toString());

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
deleted file mode 100644
index ffc41b3..0000000
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchDaoIntegrationTest.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.elasticsearch.integration;
-
-
-import org.adrianwalker.multilinestring.Multiline;
-import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
-import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
-import org.apache.metron.indexing.dao.AccessConfig;
-import org.apache.metron.indexing.dao.IndexDao;
-import org.apache.metron.indexing.dao.IndexingDaoIntegrationTest;
-import org.apache.metron.integration.InMemoryComponent;
-import org.elasticsearch.action.bulk.BulkRequestBuilder;
-import org.elasticsearch.action.bulk.BulkResponse;
-import org.elasticsearch.action.index.IndexRequestBuilder;
-import org.json.simple.JSONArray;
-import org.json.simple.JSONObject;
-import org.json.simple.parser.JSONParser;
-import org.json.simple.parser.ParseException;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.util.HashMap;
-
-public class ElasticsearchDaoIntegrationTest extends 
IndexingDaoIntegrationTest {
-  private static String indexDir = "target/elasticsearch_search";
-  private static String dateFormat = "yyyy.MM.dd.HH";
-
-  /**
-   * {
-   * "bro_doc": {
-   *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "double_field": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "bro_field": { "type": "string" },
-   *     "duplicate_name_field": { "type": "string" }
-   *   }
-   * }
-   * }
-   */
-  @Multiline
-  private static String broTypeMappings;
-
-  /**
-   * {
-   * "snort_doc": {
-   *   "properties": {
-   *     "source:type": { "type": "string" },
-   *     "ip_src_addr": { "type": "ip" },
-   *     "ip_src_port": { "type": "integer" },
-   *     "long_field": { "type": "long" },
-   *     "timestamp" : { "type": "date" },
-   *     "latitude" : { "type": "float" },
-   *     "double_field": { "type": "double" },
-   *     "is_alert": { "type": "boolean" },
-   *     "location_point": { "type": "geo_point" },
-   *     "snort_field": { "type": "integer" },
-   *     "duplicate_name_field": { "type": "integer" }
-   *   }
-   * }
-   * }
-   */
-  @Multiline
-  private static String snortTypeMappings;
-
-
-  @Override
-  protected IndexDao createDao() throws Exception {
-    IndexDao ret = new ElasticsearchDao();
-    ret.init(
-            new HashMap<String, Object>() {{
-              put("es.clustername", "metron");
-              put("es.port", "9300");
-              put("es.ip", "localhost");
-              put("es.date.format", dateFormat);
-            }},
-            new AccessConfig() {{
-              setMaxSearchResults(100);
-            }}
-    );
-    return ret;
-  }
-
-  @Override
-  protected InMemoryComponent startIndex() throws Exception {
-    InMemoryComponent es = new ElasticSearchComponent.Builder()
-            .withHttpPort(9211)
-            .withIndexDir(new File(indexDir))
-            .build();
-    es.start();
-    return es;
-  }
-
-  @Override
-  protected void loadTestData() throws ParseException {
-    ElasticSearchComponent es = (ElasticSearchComponent)indexComponent;
-    es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
-            .addMapping("bro_doc", broTypeMappings).get();
-    es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
-            .addMapping("snort_doc", snortTypeMappings).get();
-
-    BulkRequestBuilder bulkRequest = 
es.getClient().prepareBulk().setRefresh(true);
-    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
-    for(Object o: broArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc");
-      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
-    }
-    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
-    for(Object o: snortArray) {
-      JSONObject jsonObject = (JSONObject) o;
-      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc");
-      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
-      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
-      bulkRequest.add(indexRequestBuilder);
-    }
-    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
-    if (bulkResponse.hasFailures()) {
-      throw new RuntimeException("Failed to index test data");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
new file mode 100644
index 0000000..d794ac9
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchSearchIntegrationTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
+import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.indexing.dao.AccessConfig;
+import org.apache.metron.indexing.dao.IndexDao;
+import org.apache.metron.indexing.dao.SearchIntegrationTest;
+import org.apache.metron.integration.InMemoryComponent;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
+import org.json.simple.JSONArray;
+import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
+import org.json.simple.parser.ParseException;
+
+import java.io.File;
+import java.util.HashMap;
+
+public class ElasticsearchSearchIntegrationTest extends SearchIntegrationTest {
+  private static String indexDir = "target/elasticsearch_search";
+  private static String dateFormat = "yyyy.MM.dd.HH";
+
+  /**
+   * {
+   * "bro_doc": {
+   *   "properties": {
+   *     "source:type": { "type": "string" },
+   *     "ip_src_addr": { "type": "ip" },
+   *     "ip_src_port": { "type": "integer" },
+   *     "long_field": { "type": "long" },
+   *     "timestamp" : { "type": "date" },
+   *     "latitude" : { "type": "float" },
+   *     "double_field": { "type": "double" },
+   *     "is_alert": { "type": "boolean" },
+   *     "location_point": { "type": "geo_point" },
+   *     "bro_field": { "type": "string" },
+   *     "duplicate_name_field": { "type": "string" }
+   *   }
+   * }
+   * }
+   */
+  @Multiline
+  private static String broTypeMappings;
+
+  /**
+   * {
+   * "snort_doc": {
+   *   "properties": {
+   *     "source:type": { "type": "string" },
+   *     "ip_src_addr": { "type": "ip" },
+   *     "ip_src_port": { "type": "integer" },
+   *     "long_field": { "type": "long" },
+   *     "timestamp" : { "type": "date" },
+   *     "latitude" : { "type": "float" },
+   *     "double_field": { "type": "double" },
+   *     "is_alert": { "type": "boolean" },
+   *     "location_point": { "type": "geo_point" },
+   *     "snort_field": { "type": "integer" },
+   *     "duplicate_name_field": { "type": "integer" }
+   *   }
+   * }
+   * }
+   */
+  @Multiline
+  private static String snortTypeMappings;
+
+
+  @Override
+  protected IndexDao createDao() throws Exception {
+    IndexDao ret = new ElasticsearchDao();
+    ret.init(
+            new AccessConfig() {{
+              setMaxSearchResults(100);
+              setGlobalConfigSupplier( () ->
+                new HashMap<String, Object>() {{
+                  put("es.clustername", "metron");
+                  put("es.port", "9300");
+                  put("es.ip", "localhost");
+                  put("es.date.format", dateFormat);
+                  }}
+              );
+            }}
+    );
+    return ret;
+  }
+
+  @Override
+  protected InMemoryComponent startIndex() throws Exception {
+    InMemoryComponent es = new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+    es.start();
+    return es;
+  }
+
+  @Override
+  protected void loadTestData() throws ParseException {
+    ElasticSearchComponent es = (ElasticSearchComponent)indexComponent;
+    es.getClient().admin().indices().prepareCreate("bro_index_2017.01.01.01")
+            .addMapping("bro_doc", broTypeMappings).get();
+    es.getClient().admin().indices().prepareCreate("snort_index_2017.01.01.02")
+            .addMapping("snort_doc", snortTypeMappings).get();
+
+    BulkRequestBuilder bulkRequest = 
es.getClient().prepareBulk().setRefresh(true);
+    JSONArray broArray = (JSONArray) new JSONParser().parse(broData);
+    for(Object o: broArray) {
+      JSONObject jsonObject = (JSONObject) o;
+      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("bro_index_2017.01.01.01", "bro_doc");
+      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
+      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+    JSONArray snortArray = (JSONArray) new JSONParser().parse(snortData);
+    for(Object o: snortArray) {
+      JSONObject jsonObject = (JSONObject) o;
+      IndexRequestBuilder indexRequestBuilder = 
es.getClient().prepareIndex("snort_index_2017.01.01.02", "snort_doc");
+      indexRequestBuilder = 
indexRequestBuilder.setSource(jsonObject.toJSONString());
+      indexRequestBuilder = 
indexRequestBuilder.setTimestamp(jsonObject.get("timestamp").toString());
+      bulkRequest.add(indexRequestBuilder);
+    }
+    BulkResponse bulkResponse = bulkRequest.execute().actionGet();
+    if (bulkResponse.hasFailures()) {
+      throw new RuntimeException("Failed to index test data");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
new file mode 100644
index 0000000..9a1d7a7
--- /dev/null
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/ElasticsearchUpdateIntegrationTest.java
@@ -0,0 +1,219 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.elasticsearch.integration;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.utils.JSONUtils;
+import org.apache.metron.elasticsearch.dao.ElasticsearchDao;
+import 
org.apache.metron.elasticsearch.integration.components.ElasticSearchComponent;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
+import org.apache.metron.indexing.dao.*;
+import org.apache.metron.indexing.dao.update.Document;
+import org.apache.metron.indexing.dao.update.ReplaceRequest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+
+public class ElasticsearchUpdateIntegrationTest {
+  private static final int MAX_RETRIES = 10;
+  private static final int SLEEP_MS = 500;
+  private static final String SENSOR_NAME= "test";
+  private static final String TABLE_NAME = "modifications";
+  private static final String CF = "p";
+  private static String indexDir = "target/elasticsearch_mutation";
+  private static String dateFormat = "yyyy.MM.dd.HH";
+  private static String index = SENSOR_NAME + "_index_" + new 
SimpleDateFormat(dateFormat).format(new Date());
+  private static MockHTable table;
+  private static IndexDao esDao;
+  private static IndexDao hbaseDao;
+  private static MultiIndexDao dao;
+  private static ElasticSearchComponent es;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    Configuration config = HBaseConfiguration.create();
+    MockHBaseTableProvider tableProvider = new MockHBaseTableProvider();
+    tableProvider.addToCache(TABLE_NAME, CF);
+    table = (MockHTable)tableProvider.getTable(config, TABLE_NAME);
+    // setup the client
+    es = new ElasticSearchComponent.Builder()
+            .withHttpPort(9211)
+            .withIndexDir(new File(indexDir))
+            .build();
+    es.start();
+
+    hbaseDao = new HBaseDao();
+    AccessConfig accessConfig = new AccessConfig();
+    accessConfig.setTableProvider(tableProvider);
+    Map<String, Object> globalConfig = new HashMap<String, Object>() {{
+      put("es.clustername", "metron");
+      put("es.port", "9300");
+      put("es.ip", "localhost");
+      put("es.date.format", dateFormat);
+      put(HBaseDao.HBASE_TABLE, TABLE_NAME);
+      put(HBaseDao.HBASE_CF, CF);
+    }};
+    accessConfig.setGlobalConfigSupplier(() -> globalConfig);
+
+    esDao = new ElasticsearchDao();
+
+    dao = new MultiIndexDao(hbaseDao, esDao);
+    dao.init(accessConfig);
+
+  }
+
+  @AfterClass
+  public static void teardown() {
+    if(es != null) {
+      es.stop();
+    }
+  }
+
+
+
+  @Test
+  public void test() throws Exception {
+    List<Map<String, Object>> inputData = new ArrayList<>();
+    for(int i = 0; i < 10;++i) {
+      final String name = "message" + i;
+      inputData.add(
+              new HashMap<String, Object>() {{
+                put("source:type", SENSOR_NAME);
+                put("name" , name);
+                put("timestamp", System.currentTimeMillis());
+                put(Constants.GUID, name);
+              }}
+                             );
+    }
+    es.add(index, SENSOR_NAME
+          , Iterables.transform(inputData,
+                    m -> {
+                      try {
+                        return JSONUtils.INSTANCE.toJSON(m, true);
+                      } catch (JsonProcessingException e) {
+                        throw new IllegalStateException(e.getMessage(), e);
+                      }
+                    }
+                    )
+    );
+    List<Map<String,Object>> docs = null;
+    for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
+      docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+      if(docs.size() >= 10) {
+        break;
+      }
+    }
+    Assert.assertEquals(10, docs.size());
+    //modify the first message and add a new field
+    {
+      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
+        put("new-field", "metron");
+      }};
+      String guid = "" + message0.get(Constants.GUID);
+      dao.replace(new ReplaceRequest(){{
+        setReplacement(message0);
+        setGuid(guid);
+        setSensorType(SENSOR_NAME);
+      }}, Optional.empty());
+      Assert.assertEquals(1, table.size());
+      Document doc = dao.getLatest(guid, SENSOR_NAME);
+      Assert.assertEquals(message0, doc.getDocument());
+      {
+        //ensure hbase is up to date
+        Get g = new Get(guid.getBytes());
+        Result r = table.get(g);
+        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
+        Assert.assertEquals(1, columns.size());
+        Assert.assertEquals(message0
+                , JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
+                        , new TypeReference<Map<String, Object>>() {})
+        );
+      }
+      {
+        //ensure ES is up-to-date
+        long cnt = 0;
+        for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t, 
Thread.sleep(SLEEP_MS)) {
+          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+          cnt = docs
+                  .stream()
+                  .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
+                  .count();
+        }
+        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
+      }
+    }
+    //modify the same message and modify the new field
+    {
+      Map<String, Object> message0 = new HashMap<String, 
Object>(inputData.get(0)) {{
+        put("new-field", "metron2");
+      }};
+      String guid = "" + message0.get(Constants.GUID);
+      dao.replace(new ReplaceRequest(){{
+        setReplacement(message0);
+        setGuid(guid);
+        setSensorType(SENSOR_NAME);
+      }}, Optional.empty());
+      Assert.assertEquals(1, table.size());
+      Document doc = dao.getLatest(guid, SENSOR_NAME);
+      Assert.assertEquals(message0, doc.getDocument());
+      {
+        //ensure hbase is up to date
+        Get g = new Get(guid.getBytes());
+        Result r = table.get(g);
+        NavigableMap<byte[], byte[]> columns = r.getFamilyMap(CF.getBytes());
+        Assert.assertEquals(2, columns.size());
+        Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.lastEntry().getValue())
+                        , new TypeReference<Map<String, Object>>() {})
+        );
+        Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new 
String(columns.firstEntry().getValue())
+                        , new TypeReference<Map<String, Object>>() {})
+        );
+      }
+      {
+        //ensure ES is up-to-date
+        long cnt = 0;
+        for (int t = 0; t < MAX_RETRIES && cnt == 0; 
++t,Thread.sleep(SLEEP_MS)) {
+          docs = es.getAllIndexedDocs(index, SENSOR_NAME + "_doc");
+          cnt = docs
+                  .stream()
+                  .filter(d -> 
message0.get("new-field").equals(d.get("new-field")))
+                  .count();
+        }
+
+        Assert.assertNotEquals("Elasticsearch is not updated!", cnt, 0);
+      }
+    }
+  }
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
index 7766fe3..7facff5 100644
--- 
a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
+++ 
b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/integration/components/ElasticSearchComponent.java
@@ -17,13 +17,18 @@
  */
 package org.apache.metron.elasticsearch.integration.components;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.io.FileUtils;
+import org.apache.metron.common.utils.JSONUtils;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.elasticsearch.ElasticsearchTimeoutException;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
 import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
+import org.elasticsearch.action.bulk.BulkRequestBuilder;
+import org.elasticsearch.action.bulk.BulkResponse;
+import org.elasticsearch.action.index.IndexRequestBuilder;
 import org.elasticsearch.cluster.health.ClusterHealthStatus;
 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
 import org.elasticsearch.action.admin.indices.stats.IndicesStatsRequest;
@@ -39,6 +44,7 @@ import org.elasticsearch.search.SearchHit;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -90,6 +96,36 @@ public class ElasticSearchComponent implements 
InMemoryComponent {
         }
         dir.mkdirs();
     }
+
+    public BulkResponse add(String indexName, String sensorType, String... 
docs) throws IOException {
+        List<String> d = new ArrayList<>();
+        Collections.addAll(d, docs);
+        return add(indexName, sensorType, d);
+    }
+
+    public BulkResponse add(String indexName, String sensorType, 
Iterable<String> docs) throws IOException {
+        BulkRequestBuilder bulkRequest = getClient().prepareBulk();
+        for(String doc : docs) {
+            IndexRequestBuilder indexRequestBuilder = 
getClient().prepareIndex(indexName,
+                    sensorType + "_doc");
+
+            indexRequestBuilder = indexRequestBuilder.setSource(doc);
+            Map<String, Object> esDoc = JSONUtils.INSTANCE.load(doc, new 
TypeReference<Map<String, Object>>() {
+            });
+            Object ts = esDoc.get("timestamp");
+            if(ts != null) {
+                indexRequestBuilder = 
indexRequestBuilder.setTimestamp(ts.toString());
+            }
+            bulkRequest.add(indexRequestBuilder);
+        }
+
+        BulkResponse response = bulkRequest.execute().actionGet();
+        if(response.hasFailures()) {
+            throw new IOException(response.buildFailureMessage());
+        }
+        return response;
+    }
+
     @Override
     public void start() throws UnableToStartException {
         File logDir= new File(indexDir, "/logs");

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-enrichment/pom.xml 
b/metron-platform/metron-enrichment/pom.xml
index e2749c5..37cb49f 100644
--- a/metron-platform/metron-enrichment/pom.xml
+++ b/metron-platform/metron-enrichment/pom.xml
@@ -62,6 +62,13 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-hbase</artifactId>
+            <version>${project.parent.version}</version>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-profiler-client</artifactId>
             <version>${project.parent.version}</version>
         </dependency>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
index 35a90b7..974f8ab 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/simplehbase/SimpleHBaseAdapterTest.java
@@ -26,7 +26,8 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.lookup.EnrichmentLookup;
 import org.apache.metron.enrichment.converter.EnrichmentHelper;
-import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker;
 import 
org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
@@ -102,8 +103,8 @@ public class SimpleHBaseAdapterTest {
 
   @Before
   public void setup() throws Exception {
-    final MockHTable trackerTable = (MockHTable) 
MockHTable.Provider.addToCache(atTableName, cf);
-    final MockHTable hbaseTable = (MockHTable) 
MockHTable.Provider.addToCache(hbaseTableName, cf);
+    final MockHTable trackerTable = (MockHTable) 
MockHBaseTableProvider.addToCache(atTableName, cf);
+    final MockHTable hbaseTable = (MockHTable) 
MockHBaseTableProvider.addToCache(hbaseTableName, cf);
     EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new 
ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
       add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, 
"10.0.2.3")
                       , new EnrichmentValue(PLAYFUL_ENRICHMENT)

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
index e96c7a7..e421edc 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/adapters/threatintel/ThreatIntelAdapterTest.java
@@ -28,7 +28,8 @@ import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.lookup.EnrichmentLookup;
 import org.apache.metron.enrichment.converter.EnrichmentHelper;
-import org.apache.metron.test.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.enrichment.lookup.LookupKV;
 import org.apache.metron.enrichment.lookup.accesstracker.BloomAccessTracker;
 import 
org.apache.metron.enrichment.lookup.accesstracker.PersistentAccessTracker;
@@ -98,8 +99,8 @@ public class ThreatIntelAdapterTest {
   @Before
   public void setup() throws Exception {
 
-    final MockHTable trackerTable = (MockHTable) 
MockHTable.Provider.addToCache(atTableName, cf);
-    final MockHTable threatIntelTable = (MockHTable) 
MockHTable.Provider.addToCache(threatIntelTableName, cf);
+    final MockHTable trackerTable = (MockHTable) 
MockHBaseTableProvider.addToCache(atTableName, cf);
+    final MockHTable threatIntelTable = (MockHTable) 
MockHBaseTableProvider.addToCache(threatIntelTableName, cf);
     EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new 
ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
       add(new LookupKV<>(new EnrichmentKey("10.0.2.3", "10.0.2.3"), new 
EnrichmentValue(new HashMap<>())));
     }});

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
index a9a2fea..f77f16e 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/EnrichmentIntegrationTest.java
@@ -25,8 +25,6 @@ import com.google.common.base.Predicates;
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.metron.TestConstants;
 import org.apache.metron.common.Constants;
 import org.apache.metron.common.utils.JSONUtils;
@@ -38,7 +36,8 @@ import 
org.apache.metron.enrichment.integration.components.ConfigUploadComponent
 import org.apache.metron.enrichment.lookup.LookupKV;
 import 
org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator;
 import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions;
-import org.apache.metron.hbase.TableProvider;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.integration.BaseIntegrationTest;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.ProcessorResult;
@@ -48,7 +47,6 @@ import 
org.apache.metron.integration.components.ZKServerComponent;
 import org.apache.metron.integration.processors.KafkaMessageSet;
 import org.apache.metron.integration.processors.KafkaProcessor;
 import org.apache.metron.integration.utils.TestUtils;
-import org.apache.metron.test.mock.MockHTable;
 import org.apache.metron.test.utils.UnitTestHelper;
 import org.json.simple.parser.ParseException;
 import org.junit.Assert;
@@ -58,7 +56,6 @@ import org.junit.Test;
 import javax.annotation.Nullable;
 import java.io.File;
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -96,13 +93,6 @@ public class EnrichmentIntegrationTest extends 
BaseIntegrationTest {
 
   private static File geoHdfsFile;
 
-  public static class Provider implements TableProvider, Serializable {
-    MockHTable.Provider  provider = new MockHTable.Provider();
-    @Override
-    public HTableInterface getTable(Configuration config, String tableName) 
throws IOException {
-      return provider.getTable(config, tableName);
-    }
-  }
 
   private static List<byte[]> getInputMessages(String path){
     try{
@@ -138,7 +128,7 @@ public class EnrichmentIntegrationTest extends 
BaseIntegrationTest {
       setProperty("threatintel_error_topic", ERROR_TOPIC);
       setProperty("enrichment_join_cache_size", "1000");
       setProperty("threatintel_join_cache_size", "1000");
-      setProperty("enrichment_hbase_provider_impl", 
"org.apache.metron.enrichment.integration.EnrichmentIntegrationTest\\$Provider");
+      setProperty("enrichment_hbase_provider_impl", "" + 
MockHBaseTableProvider.class.getName());
       setProperty("enrichment_table", enrichmentsTableName);
       setProperty("enrichment_cf", cf);
       setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", 
\"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
@@ -171,7 +161,7 @@ public class EnrichmentIntegrationTest extends 
BaseIntegrationTest {
       File globalConfig = new File(new File(TestConstants.SAMPLE_CONFIG_PATH), 
"global.json");
       Map<String, Object> config = JSONUtils.INSTANCE.load(globalConfig, new 
TypeReference<Map<String, Object>>() {
       });
-      config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, 
Provider.class.getName());
+      config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, 
MockHBaseTableProvider.class.getName());
       config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, 
"PERSISTENT_BLOOM");
       config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, 
trackerHBaseTableName);
       config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf);
@@ -184,12 +174,12 @@ public class EnrichmentIntegrationTest extends 
BaseIntegrationTest {
             .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH);
 
     //create MockHBaseTables
-    final MockHTable trackerTable = (MockHTable) 
MockHTable.Provider.addToCache(trackerHBaseTableName, cf);
-    final MockHTable threatIntelTable = (MockHTable) 
MockHTable.Provider.addToCache(threatIntelTableName, cf);
+    final MockHTable trackerTable = (MockHTable) 
MockHBaseTableProvider.addToCache(trackerHBaseTableName, cf);
+    final MockHTable threatIntelTable = (MockHTable) 
MockHBaseTableProvider.addToCache(threatIntelTableName, cf);
     EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new 
ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
       add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new 
EnrichmentValue(new HashMap<>())));
     }});
-    final MockHTable enrichmentTable = (MockHTable) 
MockHTable.Provider.addToCache(enrichmentsTableName, cf);
+    final MockHTable enrichmentTable = (MockHTable) 
MockHBaseTableProvider.addToCache(enrichmentsTableName, cf);
     EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new 
ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
       add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, 
"10.0.2.3")
                       , new EnrichmentValue(PLAYFUL_ENRICHMENT)

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
index 5901d9f..e4625ac 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/components/ConfigUploadComponent.java
@@ -18,10 +18,12 @@
 package org.apache.metron.enrichment.integration.components;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.ConfigurationsUtils;
 import org.apache.metron.common.configuration.SensorParserConfig;
 import org.apache.metron.integration.InMemoryComponent;
 import org.apache.metron.integration.UnableToStartException;
 import org.apache.metron.integration.components.ZKServerComponent;
+import org.apache.zookeeper.KeeperException;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -34,6 +36,7 @@ import static 
org.apache.metron.common.configuration.ConfigurationsUtils.*;
 
 public class ConfigUploadComponent implements InMemoryComponent {
 
+  private String connectionString;
   private Properties topologyProperties;
   private String globalConfigPath;
   private String parserConfigsPath;
@@ -43,6 +46,12 @@ public class ConfigUploadComponent implements 
InMemoryComponent {
   private Optional<Consumer<ConfigUploadComponent>> postStartCallback = 
Optional.empty();
   private Optional<String> globalConfig = Optional.empty();
   private Map<String, SensorParserConfig> parserSensorConfigs = new 
HashMap<>();
+
+  public ConfigUploadComponent withConnectionString(String connectionString) {
+    this.connectionString = connectionString;
+    return this;
+  }
+
   public ConfigUploadComponent withTopologyProperties(Properties 
topologyProperties) {
     this.topologyProperties = topologyProperties;
     return this;
@@ -129,7 +138,7 @@ public class ConfigUploadComponent implements 
InMemoryComponent {
 
   public void update() throws UnableToStartException {
     try {
-      final String zookeeperUrl = 
topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY);
+      final String zookeeperUrl = connectionString == 
null?topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY):connectionString;
 
       if(globalConfigPath != null
       || parserConfigsPath != null
@@ -157,6 +166,7 @@ public class ConfigUploadComponent implements 
InMemoryComponent {
   }
 
 
+
   public SensorParserConfig getSensorParserConfig(String sensorType) {
     SensorParserConfig sensorParserConfig = new SensorParserConfig();
     CuratorFramework client = 
getClient(topologyProperties.getProperty(ZKServerComponent.ZOOKEEPER_PROPERTY));

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
deleted file mode 100644
index ac2904a..0000000
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/integration/mock/MockTableProvider.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.metron.enrichment.integration.mock;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.test.mock.MockHTable;
-
-import java.io.IOException;
-import java.io.Serializable;
-
-public class MockTableProvider implements TableProvider, Serializable {
-  static MockHTable.Provider provider = new MockHTable.Provider();
-  @Override
-  public HTableInterface getTable(Configuration config, String tableName) 
throws IOException {
-    return provider.getTable(config, tableName);
-  }
-  public static void addTable(String tableName, String... cf) {
-    provider.addToCache(tableName, cf);
-  }
-  public static MockHTable getTable(String tableName) {
-    try {
-      return (MockHTable) provider.getTable(null, tableName);
-    } catch (IOException e) {
-      throw new RuntimeException("Unable to get table: " + tableName);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
index d0d41f7..dbbc7d5 100644
--- 
a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
+++ 
b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/stellar/SimpleHBaseEnrichmentFunctionsTest.java
@@ -19,8 +19,8 @@
 package org.apache.metron.enrichment.stellar;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.mock.MockHTable;
+import org.apache.metron.hbase.mock.MockHBaseTableProvider;
 import org.apache.metron.stellar.dsl.Context;
 import org.apache.metron.stellar.dsl.DefaultVariableResolver;
 import org.apache.metron.stellar.dsl.ParseException;
@@ -31,13 +31,10 @@ import 
org.apache.metron.enrichment.converter.EnrichmentHelper;
 import org.apache.metron.enrichment.converter.EnrichmentKey;
 import org.apache.metron.enrichment.converter.EnrichmentValue;
 import org.apache.metron.enrichment.lookup.LookupKV;
-import org.apache.metron.hbase.TableProvider;
-import org.apache.metron.test.mock.MockHTable;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -49,18 +46,12 @@ public class SimpleHBaseEnrichmentFunctionsTest {
   private String cf = "cf";
   private static Context context;
 
-  public static class TP implements TableProvider {
 
-    @Override
-    public HTableInterface getTable(Configuration config, String tableName) 
throws IOException {
-      return MockHTable.Provider.getFromCache(tableName);
-    }
-  }
 
   @Before
   public void setup() throws Exception {
 
-    final MockHTable hbaseTable = (MockHTable) 
MockHTable.Provider.addToCache(hbaseTableName, cf);
+    final MockHTable hbaseTable = (MockHTable) 
MockHBaseTableProvider.addToCache(hbaseTableName, cf);
     EnrichmentHelper.INSTANCE.load(hbaseTable, cf, new 
ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{
       for(int i = 0;i < 5;++i) {
         add(new LookupKV<>(new EnrichmentKey(ENRICHMENT_TYPE, "indicator" + i)
@@ -72,7 +63,7 @@ public class SimpleHBaseEnrichmentFunctionsTest {
     context = new Context.Builder()
             .with( Context.Capabilities.GLOBAL_CONFIG
                  , () -> ImmutableMap.of( 
SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF
-                                        , TP.class.getName()
+                                        , 
MockHBaseTableProvider.class.getName()
                                         )
                  )
             .build();

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase-client/pom.xml 
b/metron-platform/metron-hbase-client/pom.xml
new file mode 100644
index 0000000..5dd6127
--- /dev/null
+++ b/metron-platform/metron-hbase-client/pom.xml
@@ -0,0 +1,100 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- 
+  Licensed to the Apache Software 
+       Foundation (ASF) under one or more contributor license agreements. See 
the 
+       NOTICE file distributed with this work for additional information 
regarding 
+       copyright ownership. The ASF licenses this file to You under the Apache 
License, 
+       Version 2.0 (the "License"); you may not use this file except in 
compliance 
+       with the License. You may obtain a copy of the License at 
http://www.apache.org/licenses/LICENSE-2.0 
+       Unless required by applicable law or agreed to in writing, software 
distributed 
+       under the License is distributed on an "AS IS" BASIS, WITHOUT 
WARRANTIES 
+       OR CONDITIONS OF ANY KIND, either express or implied. See the License 
for 
+  the specific language governing permissions and limitations under the 
License. 
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.metron</groupId>
+        <artifactId>metron-platform</artifactId>
+        <version>0.4.1</version>
+    </parent>
+    <!--
+    The purpose of this project is to provide a version of the hbase client 
which has a shaded and relocated gauva.
+    This was motivated by the metron-rest project having transitive 
dependencies (swagger) which conflict strongly
+    with the transitive dependencies of hbase-client (specifically guava 
incompatibilities).
+
+    The HBase team provides a shaded and relocated client artifact, BUT some 
of the *other* transitive dependencies
+    are not shaded and relocated properly. The purpose of this project is to 
add the transitive dependencies with
+    their properly relocated package prefixes. Currently we have not chosen to 
retrofit the rest of the project with
+    this dependency due to the significance of the change, but it may be 
useful to do so in the future.
+    -->
+    <artifactId>metron-hbase-client</artifactId>
+    <name>metron-hbase-client</name>
+    <url>https://metron.apache.org/</url>
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
+        <!-- Note: This is 1.1.2 and not ${global_hbase_version} to avoid 
https://issues.apache.org/jira/browse/HBASE-13889 -->
+        <!-- when we migrate to a version > 1.1.2, this can be set to 
${global_hbase_version} -->
+        <shaded.client.version>1.1.2</shaded.client.version>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-shaded-client</artifactId>
+            <version>${shaded.client.version}</version>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>${global_jar_version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>${global_shade_version}</version>
+                <configuration>
+                    
<createDependencyReducedPom>true</createDependencyReducedPom>
+                    <artifactSet>
+                        <excludes>
+                            <exclude>*slf4j*</exclude>
+                        </excludes>
+                    </artifactSet>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <relocations>
+                                <relocation>
+                                    
<pattern>org.apache.commons.logging</pattern>
+                                    
<shadedPattern>org.apache.hadoop.hbase.shaded.org.apache.commons.logging</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.log4j</pattern>
+                                    
<shadedPattern>org.apache.hadoop.hbase.shaded.org.apache.log4j</shadedPattern>
+                                </relocation>
+                            </relocations>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/metron-platform/metron-hbase/pom.xml 
b/metron-platform/metron-hbase/pom.xml
index cd5f641..180134f 100644
--- a/metron-platform/metron-hbase/pom.xml
+++ b/metron-platform/metron-hbase/pom.xml
@@ -232,4 +232,20 @@
             <scope>provided</scope>
         </dependency>
     </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>${global_jar_version}</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
 </project>

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
index dc0569e..1804697 100644
--- 
a/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
+++ 
b/metron-platform/metron-hbase/src/main/java/org/apache/metron/hbase/TableProvider.java
@@ -22,7 +22,16 @@ import org.apache.hadoop.hbase.client.HTableInterface;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.InvocationTargetException;
+import java.util.function.Supplier;
 
 public interface TableProvider extends Serializable {
-    HTableInterface getTable(Configuration config, String tableName) throws 
IOException;
+  HTableInterface getTable(Configuration config, String tableName) throws 
IOException;
+  static TableProvider create(String impl, Supplier<TableProvider> 
defaultSupplier) throws ClassNotFoundException, NoSuchMethodException, 
IllegalAccessException, InvocationTargetException, InstantiationException {
+    if(impl == null) {
+      return defaultSupplier.get();
+    }
+    Class<? extends TableProvider> clazz = (Class<? extends TableProvider>) 
Class.forName(impl);
+    return clazz.getConstructor().newInstance();
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/813adf2d/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
new file mode 100644
index 0000000..57981ac
--- /dev/null
+++ 
b/metron-platform/metron-hbase/src/test/java/org/apache/metron/hbase/mock/MockHBaseTableProvider.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.hbase.mock;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.metron.hbase.TableProvider;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MockHBaseTableProvider implements Serializable, TableProvider {
+  private static Map<String, HTableInterface> _cache = new HashMap<>();
+  public HTableInterface getTable(Configuration config, String tableName) 
throws IOException {
+    HTableInterface ret = _cache.get(tableName);
+    return ret;
+  }
+
+  public static HTableInterface getFromCache(String tableName) {
+    return _cache.get(tableName);
+  }
+
+  public static HTableInterface addToCache(String tableName, String... 
columnFamilies) {
+    MockHTable ret =  new MockHTable(tableName, columnFamilies);
+    _cache.put(tableName, ret);
+    return ret;
+  }
+
+  public static void clear() {
+    _cache.clear();
+  }
+}

Reply via email to