Repository: metron Updated Branches: refs/heads/master 1d89c3183 -> 38d41fe67
METRON-1121 Ignore Profile with Bad 'init', 'update' or 'groupBy' Expressions (nickwallen) closes apache/metron#707 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/38d41fe6 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/38d41fe6 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/38d41fe6 Branch: refs/heads/master Commit: 38d41fe67dee15728b50d930f20d8b19ad935009 Parents: 1d89c31 Author: nickwallen <[email protected]> Authored: Thu Aug 24 10:08:14 2017 -0400 Committer: nickallen <[email protected]> Committed: Thu Aug 24 10:08:14 2017 -0400 ---------------------------------------------------------------------- .../profiler/DefaultMessageDistributor.java | 5 +- .../metron/profiler/DefaultProfileBuilder.java | 75 +++--- .../apache/metron/profiler/ProfileBuilder.java | 4 +- .../profiler/DefaultProfileBuilderTest.java | 261 +++++++++++++++++-- 4 files changed, 283 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index ba3c0d8..53377a0 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -29,6 +29,7 @@ import org.json.simple.JSONObject; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -98,8 +99,8 @@ public class DefaultMessageDistributor implements MessageDistributor { profileCache.asMap().forEach((key, profileBuilder) -> { if(profileBuilder.isInitialized()) { - ProfileMeasurement measurement = profileBuilder.flush(); - measurements.add(measurement); + Optional<ProfileMeasurement> measurement = profileBuilder.flush(); + measurement.ifPresent(measurements::add); } }); http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java index 41c31d7..3ceeed7 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.commons.collections4.ListUtils; @@ -120,15 +121,18 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { * @param message The message to apply. */ @Override - @SuppressWarnings("unchecked") public void apply(JSONObject message) { + try { + if (!isInitialized()) { + assign(definition.getInit(), message, "init"); + isInitialized = true; + } - if(!isInitialized()) { - assign(definition.getInit(), message, "init"); - isInitialized = true; - } + assign(definition.getUpdate(), message, "update"); - assign(definition.getUpdate(), message, "update"); + } catch(Throwable e) { + LOG.error(format("Unable to apply message to profile: %s", e.getMessage()), e); + } } /** @@ -140,33 +144,41 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { * @return Returns the completed profile measurement. */ @Override - public ProfileMeasurement flush() { + public Optional<ProfileMeasurement> flush() { LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity); + Optional<ProfileMeasurement> result = Optional.empty(); - // execute the 'profile' expression(s) - @SuppressWarnings("unchecked") - Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile"); - - // execute the 'triage' expression(s) - Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions() - .entrySet() - .stream() - .collect(Collectors.toMap( - e -> e.getKey(), - e -> execute(e.getValue(), "result/triage"))); - - // execute the 'groupBy' expression(s) - can refer to value of 'result' expression - List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy"); + try { + // execute the 'profile' expression(s) + Object profileValue = execute(definition.getResult().getProfileExpressions().getExpression(), "result/profile"); + + // execute the 'triage' expression(s) + Map<String, Object> triageValues = definition.getResult().getTriageExpressions().getExpressions() + .entrySet() + .stream() + .collect(Collectors.toMap( + e -> e.getKey(), + e -> execute(e.getValue(), "result/triage"))); + + // execute the 'groupBy' expression(s) - can refer to value of 'result' expression + List<Object> groups = execute(definition.getGroupBy(), ImmutableMap.of("result", profileValue), "groupBy"); + + result = Optional.of(new ProfileMeasurement() + .withProfileName(profileName) + .withEntity(entity) + .withGroups(groups) + .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS) + .withProfileValue(profileValue) + .withTriageValues(triageValues) + .withDefinition(definition)); + + } catch(Throwable e) { + // if any of the Stellar expressions fail, a measurement should NOT be returned + LOG.error(format("Unable to flush profile: %s", e.getMessage()), e); + } isInitialized = false; - return new ProfileMeasurement() - .withProfileName(profileName) - .withEntity(entity) - .withGroups(groups) - .withPeriod(clock.currentTimeMillis(), periodDurationMillis, TimeUnit.MILLISECONDS) - .withProfileValue(profileValue) - .withTriageValues(triageValues) - .withDefinition(definition); + return result; } /** @@ -233,7 +245,7 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { } catch(ParseException e) { // make it brilliantly clear that one of the 'update' expressions is bad - String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); + String msg = format("Bad '%s' expression: error=%s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); throw new ParseException(msg, e); } } @@ -253,7 +265,8 @@ public class DefaultProfileBuilder implements ProfileBuilder, Serializable { .forEach((expr) -> results.add(executor.execute(expr, transientState, Object.class))); } catch (Throwable e) { - String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); + String msg = format("Bad '%s' expression: error=%s, profile=%s, entity=%s", expressionType, e.getMessage(), profileName, entity); + LOG.error(msg, e); throw new ParseException(msg, e); } http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java index 9a5407b..d1fbc11 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java @@ -23,6 +23,8 @@ package org.apache.metron.profiler; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.json.simple.JSONObject; +import java.util.Optional; + /** * Responsible for building and maintaining a Profile. * @@ -49,7 +51,7 @@ public interface ProfileBuilder { * * @return Returns the completed profile measurement. */ - ProfileMeasurement flush(); + Optional<ProfileMeasurement> flush(); /** * Has the ProfileBuilder been initialized? http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java index fa5c19c..7c8664a 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java @@ -30,10 +30,13 @@ import org.json.simple.parser.JSONParser; import org.junit.Before; import org.junit.Test; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static org.apache.metron.stellar.common.utils.ConversionUtils.convert; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; /** * Tests the ProfileBuilder class. @@ -88,10 +91,11 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate that x = 100, y = 200 - assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class)); + assertEquals(100 + 200, (int) convert(m.get().getProfileValue(), Integer.class)); } /** @@ -110,10 +114,11 @@ public class DefaultProfileBuilderTest { .build(); // execute - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate that x = 0 and y = 0 as no initialization occurred - assertEquals(0, (int) convert(m.getProfileValue(), Integer.class)); + assertEquals(0, (int) convert(m.get().getProfileValue(), Integer.class)); } /** @@ -153,10 +158,11 @@ public class DefaultProfileBuilderTest { for(int i=0; i<count; i++) { builder.apply(message); } - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate that x=0, y=0 then x+=1, y+=2 for each message - assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), Integer.class)); + assertEquals(count*1 + count*2, (int) convert(m.get().getProfileValue(), Integer.class)); } /** @@ -186,10 +192,11 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(100, (int) convert(m.getProfileValue(), Integer.class)); + assertEquals(100, (int) convert(m.get().getProfileValue(), Integer.class)); } /** @@ -213,11 +220,12 @@ public class DefaultProfileBuilderTest { { // apply a message and flush builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate the profile period ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); - assertEquals(expected, m.getPeriod()); + assertEquals(expected, m.get().getPeriod()); } { // advance time by at least one period - 10 minutes @@ -225,11 +233,12 @@ public class DefaultProfileBuilderTest { // apply a message and flush again builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate the profile period ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 10, TimeUnit.MINUTES); - assertEquals(expected, m.getPeriod()); + assertEquals(expected, m.get().getPeriod()); } } @@ -262,12 +271,13 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(2, m.getGroups().size()); - assertEquals(100, m.getGroups().get(0)); - assertEquals(200, m.getGroups().get(1)); + assertEquals(2, m.get().getGroups().size()); + assertEquals(100, m.get().getGroups().get(0)); + assertEquals(200, m.get().getGroups().get(1)); } /** @@ -308,10 +318,11 @@ public class DefaultProfileBuilderTest { // apply another message to accumulate new state, then flush again to validate original state was cleared builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(33, m.getProfileValue()); + assertEquals(33, m.get().getProfileValue()); } /** @@ -352,10 +363,11 @@ public class DefaultProfileBuilderTest { // apply another message to accumulate new state, then flush again to validate original state was cleared builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(3, m.getProfileValue()); + assertEquals(3, m.get().getProfileValue()); } /** * { @@ -384,10 +396,11 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(entity, m.getEntity()); + assertEquals(entity, m.get().getEntity()); } /** @@ -421,10 +434,11 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(100, m.getProfileValue()); + assertEquals(100, m.get().getProfileValue()); } /** @@ -462,11 +476,202 @@ public class DefaultProfileBuilderTest { // execute builder.apply(message); - ProfileMeasurement m = builder.flush(); + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); // validate - assertEquals(0, m.getTriageValues().get("zero")); - assertEquals(100, m.getTriageValues().get("hundred")); - assertEquals(100, m.getProfileValue()); + assertEquals(0, m.get().getTriageValues().get("zero")); + assertEquals(100, m.get().getTriageValues().get("hundred")); + assertEquals(100, m.get().getProfileValue()); + } + + /** + * { + * "profile": "bad-init", + * "foreach": "ip_src_addr", + * "init": { "x": "2 / 0" }, + * "update": { "x": "x + 1" }, + * "result": "x + y", + * "groupBy": ["cheese"] + * } + */ + @Multiline + private String badInitProfile; + + @Test + public void testBadInitExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // due to the bad expression, there should be no result + builder.apply(message); + assertFalse(builder.flush().isPresent()); + } + + /** + * { + * "profile": "bad-simple-result", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "2 / 0", + * "groupBy": ["cheese"] + * } + */ + @Multiline + private String badSimpleResultProfile; + + @Test + public void testBadResultExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // due to the bad expression, there should be no result + builder.apply(message); + assertFalse(builder.flush().isPresent()); + } + + /** + * { + * "profile": "bad-groupBy", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + 1" }, + * "result": "x", + * "groupBy": ["2 / 0"] + * } + */ + @Multiline + private String badGroupByProfile; + + @Test + public void testBadGroupByExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badGroupByProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // due to the bad expression, there should be no result + builder.apply(message); + assertFalse(builder.flush().isPresent()); + } + + /** + * { + * "profile": "bad-result-profile", + * "foreach": "ip_src_addr", + * "init": { "x": "100" }, + * "result": { + * "profile": "2 / 0", + * "triage": { + * "zero": "x - 100", + * "hundred": "x" + * } + * } + * } + */ + @Multiline + private String badResultProfile; + + @Test + public void testBadResultProfileExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badResultProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // due to the bad expression, there should be no result + builder.apply(message); + assertFalse(builder.flush().isPresent()); + } + + /** + * { + * "profile": "bad-result-triage", + * "foreach": "ip_src_addr", + * "init": { "x": "100" }, + * "result": { + * "profile": "x", + * "triage": { + * "zero": "x - 100", + * "hundred": "2 / 0" + * } + * } + * } + */ + @Multiline + private String badResultTriage; + + @Test + public void testBadResultTriageExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // due to the bad expression, there should be no result + builder.apply(message); + assertFalse(builder.flush().isPresent()); + } + + /** + * { + * "profile": "bad-update", + * "foreach": "ip_src_addr", + * "init": { "x": "0" }, + * "update": { "x": "x + (2/0)" }, + * "result": "x" + * } + */ + @Multiline + private String badUpdateProfile; + + /** + * If the 'init' expression succeeds, but the 'update' fails, the profile should still flush. We cannot + * be sure if the 'update' is failing on every message or just one. Since that is the case, the profile + * flushes whatever data it has. + */ + @Test + public void testBadUpdateExpression() throws Exception { + // setup + definition = JSONUtils.INSTANCE.load(badUpdateProfile, ProfileConfig.class); + builder = new DefaultProfileBuilder.Builder() + .withDefinition(definition) + .withEntity("10.0.0.1") + .withPeriodDuration(10, TimeUnit.MINUTES) + .withContext(Context.EMPTY_CONTEXT()) + .build(); + + // execute + builder.apply(message); + + // if the update expression fails, the profile should still flush. + Optional<ProfileMeasurement> m = builder.flush(); + assertTrue(m.isPresent()); + assertEquals(0, m.get().getProfileValue()); } }
