Github user mmiklavc commented on a diff in the pull request:
https://github.com/apache/metron/pull/1247#discussion_r236521838
--- Diff:
metron-platform/metron-solr/src/test/java/org/apache/metron/solr/integration/SolrUpdateIntegrationTest.java
---
@@ -186,4 +195,114 @@ public void testHugeErrorFields() throws Exception {
exception.expectMessage("Document contains at least one immense term
in field=\"error_hash\"");
getDao().update(errorDoc, Optional.of("error"));
}
+
+ @Test
+ @Override
+ public void test() throws Exception {
+ List<Map<String, Object>> inputData = new ArrayList<>();
+ for(int i = 0; i < 10;++i) {
+ final String name = "message" + i;
+ inputData.add(
+ new HashMap<String, Object>() {{
+ put("source.type", SENSOR_NAME);
+ put("name" , name);
+ put("timestamp", System.currentTimeMillis());
+ put(Constants.GUID, name);
+ }}
+ );
+ }
+ addTestData(getIndexName(), SENSOR_NAME, inputData);
+ List<Map<String,Object>> docs = null;
+ for(int t = 0;t < MAX_RETRIES;++t, Thread.sleep(SLEEP_MS)) {
+ docs = getIndexedTestData(getIndexName(), SENSOR_NAME);
+ if(docs.size() >= 10) {
+ break;
+ }
+ }
+ Assert.assertEquals(10, docs.size());
+ //modify the first message and add a new field
+ {
+ Map<String, Object> message0 = new HashMap<String,
Object>(inputData.get(0)) {{
+ put("new-field", "metron");
+ }};
+ String guid = "" + message0.get(Constants.GUID);
+ Document update = getDao().replace(new ReplaceRequest(){{
+ setReplacement(message0);
+ setGuid(guid);
+ setSensorType(SENSOR_NAME);
+ setIndex(getIndexName());
+ }}, Optional.empty());
+
+ Assert.assertEquals(message0, update.getDocument());
+ Assert.assertEquals(1, getMockHTable().size());
+ findUpdatedDoc(message0, guid, SENSOR_NAME);
+ {
+ //ensure hbase is up to date
+ Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid,
SENSOR_NAME)));
+ Result r = getMockHTable().get(g);
+ NavigableMap<byte[], byte[]> columns =
r.getFamilyMap(CF.getBytes());
+ Assert.assertEquals(1, columns.size());
+ Assert.assertEquals(message0
+ , JSONUtils.INSTANCE.load(new
String(columns.lastEntry().getValue())
+ , JSONUtils.MAP_SUPPLIER)
+ );
+ }
+ {
+ //ensure ES is up-to-date
+ long cnt = 0;
+ for (int t = 0; t < MAX_RETRIES && cnt == 0; ++t,
Thread.sleep(SLEEP_MS)) {
+ docs = getIndexedTestData(getIndexName(), SENSOR_NAME);
+ cnt = docs
+ .stream()
+ .filter(d ->
message0.get("new-field").equals(d.get("new-field")))
+ .count();
+ }
+ Assert.assertNotEquals("Data store is not updated!", cnt, 0);
+ }
+ }
+ //modify the same message and modify the new field
+ {
+ Map<String, Object> message0 = new HashMap<String,
Object>(inputData.get(0)) {{
+ put("new-field", "metron2");
+ }};
+ String guid = "" + message0.get(Constants.GUID);
+ Document update = getDao().replace(new ReplaceRequest(){{
+ setReplacement(message0);
+ setGuid(guid);
+ setSensorType(SENSOR_NAME);
+ setIndex(getIndexName());
+ }}, Optional.empty());
+ Assert.assertEquals(message0, update.getDocument());
+ Assert.assertEquals(1, getMockHTable().size());
+ Document doc = getDao().getLatest(guid, SENSOR_NAME);
+ Assert.assertEquals(message0, doc.getDocument());
+ findUpdatedDoc(message0, guid, SENSOR_NAME);
+ {
+ //ensure hbase is up to date
+ Get g = new Get(HBaseDao.Key.toBytes(new HBaseDao.Key(guid,
SENSOR_NAME)));
+ Result r = getMockHTable().get(g);
+ NavigableMap<byte[], byte[]> columns =
r.getFamilyMap(CF.getBytes());
+ Assert.assertEquals(2, columns.size());
+ Assert.assertEquals(message0, JSONUtils.INSTANCE.load(new
String(columns.lastEntry().getValue())
+ , JSONUtils.MAP_SUPPLIER)
+ );
+ Assert.assertNotEquals(message0, JSONUtils.INSTANCE.load(new
String(columns.firstEntry().getValue())
+ , JSONUtils.MAP_SUPPLIER)
+ );
+ }
+ {
+ //ensure ES is up-to-date
--- End diff --
Solr
---