Repository: metron Updated Branches: refs/heads/master b4d76f98e -> 4ef65e09e
METRON-1889: Add any missing timestamp fields to unified enrichment topology (mmiklavc via mmiklavc) closes apache/metron#1286 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/4ef65e09 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/4ef65e09 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/4ef65e09 Branch: refs/heads/master Commit: 4ef65e09ea4a1eac8abf89521e5a999faeca1f37 Parents: b4d76f9 Author: mmiklavc <[email protected]> Authored: Tue Dec 4 07:27:33 2018 -0700 Committer: Michael Miklavcic <[email protected]> Committed: Tue Dec 4 07:27:33 2018 -0700 ---------------------------------------------------------------------- .../enrichment/parallel/ParallelEnricher.java | 10 +- .../enrichment/utils/EnrichmentUtils.java | 13 +-- .../parallel/ParallelEnricherTest.java | 104 ++++++++++++------- 3 files changed, 77 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java index b10c148..1de8945 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/parallel/ParallelEnricher.java @@ -157,6 +157,7 @@ public class ParallelEnricher { throw new IllegalStateException("Unable to find an adapter for " + task.getKey() + ", possible adapters are: " + Joiner.on(",").join(enrichmentsByType.keySet())); } + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".begin.ts", "" + System.currentTimeMillis()); for(JSONObject m : task.getValue()) { /* now for each unit of work (each of these only has one element in them) * the key is the field name and the value is value associated with that field. @@ -171,6 +172,7 @@ public class ParallelEnricher { String field = (String) o; Object value = m.get(o); if(value == null) { + message.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); continue; } CacheKey cacheKey = new CacheKey(field, value, config); @@ -182,7 +184,10 @@ public class ParallelEnricher { ret = new JSONObject(); } //each enrichment has their own unique prefix to use to adjust the keys for the enriched fields. - return EnrichmentUtils.adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + JSONObject adjustedKeys = EnrichmentUtils + .adjustKeys(new JSONObject(), ret, cacheKey.getField(), prefix); + adjustedKeys.put("adapter." + adapter.getClass().getSimpleName().toLowerCase() + ".end.ts", "" + System.currentTimeMillis()); + return adjustedKeys; } catch (Throwable e) { JSONObject errorMessage = new JSONObject(); errorMessage.putAll(m); @@ -197,11 +202,12 @@ public class ParallelEnricher { } } if(taskList.isEmpty()) { + message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); return new EnrichmentResult(message, errors); } EnrichmentResult ret = new EnrichmentResult(all(taskList, message, (left, right) -> join(left, right)).get(), errors); - message.put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); + ret.getResult().put(getClass().getSimpleName().toLowerCase() + ".enrich.end.ts", "" + System.currentTimeMillis()); if(perfLog != null) { String key = message.get(Constants.GUID) + ""; perfLog.log("enrich", "key={}, elapsed time to enrich", key); http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java index 63d39c5..9a36a87 100644 --- a/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java +++ b/metron-platform/metron-enrichment/src/main/java/org/apache/metron/enrichment/utils/EnrichmentUtils.java @@ -21,21 +21,18 @@ import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Splitter; import com.google.common.collect.Iterables; +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nullable; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; -import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.enrichment.converter.EnrichmentKey; import org.apache.metron.enrichment.lookup.EnrichmentLookup; import org.apache.metron.enrichment.lookup.handler.KeyWithContext; import org.apache.metron.hbase.TableProvider; -import org.apache.metron.enrichment.converter.EnrichmentKey; import org.json.simple.JSONObject; -import sun.management.Sensor; - -import javax.annotation.Nullable; -import java.lang.reflect.InvocationTargetException; -import java.util.HashMap; -import java.util.Map; public class EnrichmentUtils { http://git-wip-us.apache.org/repos/asf/metron/blob/4ef65e09/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java index d4fcdf4..a6832d6 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/parallel/ParallelEnricherTest.java @@ -65,51 +65,57 @@ public class ParallelEnricherTest { private static Context stellarContext; private static AtomicInteger numAccesses = new AtomicInteger(0); private static Map<String, EnrichmentAdapter<CacheKey>> enrichmentsByType; - @BeforeClass - public static void setup() { - ConcurrencyContext infrastructure = new ConcurrencyContext(); - infrastructure.initialize(5, 100, 10, null, null, false); - stellarContext = new Context.Builder() - .build(); - StellarFunctions.initialize(stellarContext); - StellarAdapter adapter = new StellarAdapter(){ - @Override - public void logAccess(CacheKey value) { - numAccesses.incrementAndGet(); - } - }.ofType("ENRICHMENT"); - adapter.initializeAdapter(new HashMap<>()); - EnrichmentAdapter<CacheKey> dummy = new EnrichmentAdapter<CacheKey>() { - @Override - public void logAccess(CacheKey value) { + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class DummyEnrichmentAdapter implements EnrichmentAdapter<CacheKey> { + @Override + public void logAccess(CacheKey value) { - } + } - @Override - public JSONObject enrich(CacheKey value) { - return null; - } + @Override + public JSONObject enrich(CacheKey value) { + return null; + } - @Override - public boolean initializeAdapter(Map<String, Object> config) { - return false; - } + @Override + public boolean initializeAdapter(Map<String, Object> config) { + return false; + } - @Override - public void updateAdapter(Map<String, Object> config) { + @Override + public void updateAdapter(Map<String, Object> config) { - } + } - @Override - public void cleanup() { + @Override + public void cleanup() { - } + } - @Override - public String getOutputPrefix(CacheKey value) { - return null; - } - }; + @Override + public String getOutputPrefix(CacheKey value) { + return null; + } + } + + // Declaring explicit class bc getClass().getSimpleName() returns "" for anon classes + public static class AccessLoggingStellarAdapter extends StellarAdapter { + @Override + public void logAccess(CacheKey value) { + numAccesses.incrementAndGet(); + } + } + + @BeforeClass + public static void setup() { + ConcurrencyContext infrastructure = new ConcurrencyContext(); + infrastructure.initialize(5, 100, 10, null, null, false); + stellarContext = new Context.Builder() + .build(); + StellarFunctions.initialize(stellarContext); + StellarAdapter adapter = new AccessLoggingStellarAdapter().ofType("ENRICHMENT"); + adapter.initializeAdapter(new HashMap<>()); + EnrichmentAdapter<CacheKey> dummy = new DummyEnrichmentAdapter(); enrichmentsByType = ImmutableMap.of("stellar", adapter, "dummy", dummy); enricher = new ParallelEnricher(enrichmentsByType, infrastructure, false); @@ -139,13 +145,19 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 8, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(0, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** * { @@ -170,7 +182,13 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals("Got the wrong result count: " + ret, 4, ret.size()); + Assert.assertEquals("Got the wrong result count: " + ret, 7, ret.size()); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.dummyenrichmentadapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /** @@ -208,13 +226,19 @@ public class ParallelEnricherTest { }}; ParallelEnricher.EnrichmentResult result = enricher.apply(message, EnrichmentStrategies.ENRICHMENT, config, null); JSONObject ret = result.getResult(); - Assert.assertEquals(ret + " is not what I expected", 8, ret.size()); + Assert.assertEquals(ret + " is not what I expected", 11, ret.size()); Assert.assertEquals(1, ret.get("map.blah")); Assert.assertEquals("test", ret.get("source.type")); Assert.assertEquals(1, ret.get("one")); Assert.assertEquals(2, ret.get("foo")); Assert.assertEquals("TEST", ret.get("ALL_CAPS")); Assert.assertEquals(1, result.getEnrichmentErrors().size()); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("adapter.accessloggingstellaradapter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.splitter.end.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.begin.ts")); + Assert.assertTrue(result.getResult().containsKey("parallelenricher.enrich.end.ts")); } /**
