Repository: metron
Updated Branches:
  refs/heads/master 1d89c3183 -> 38d41fe67


METRON-1121 Ignore Profile with Bad 'init', 'update' or 'groupBy' Expressions 
(nickwallen) closes apache/metron#707


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/38d41fe6
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/38d41fe6
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/38d41fe6

Branch: refs/heads/master
Commit: 38d41fe67dee15728b50d930f20d8b19ad935009
Parents: 1d89c31
Author: nickwallen <[email protected]>
Authored: Thu Aug 24 10:08:14 2017 -0400
Committer: nickallen <[email protected]>
Committed: Thu Aug 24 10:08:14 2017 -0400

----------------------------------------------------------------------
 .../profiler/DefaultMessageDistributor.java     |   5 +-
 .../metron/profiler/DefaultProfileBuilder.java  |  75 +++---
 .../apache/metron/profiler/ProfileBuilder.java  |   4 +-
 .../profiler/DefaultProfileBuilderTest.java     | 261 +++++++++++++++++--
 4 files changed, 283 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index ba3c0d8..53377a0 100644
--- 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -29,6 +29,7 @@ import org.json.simple.JSONObject;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -98,8 +99,8 @@ public class DefaultMessageDistributor implements 
MessageDistributor {
 
     profileCache.asMap().forEach((key, profileBuilder) -> {
       if(profileBuilder.isInitialized()) {
-        ProfileMeasurement measurement = profileBuilder.flush();
-        measurements.add(measurement);
+        Optional<ProfileMeasurement> measurement = profileBuilder.flush();
+        measurement.ifPresent(measurements::add);
       }
     });
 

http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
index 41c31d7..3ceeed7 100644
--- 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultProfileBuilder.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.collections4.ListUtils;
@@ -120,15 +121,18 @@ public class DefaultProfileBuilder implements 
ProfileBuilder, Serializable {
    * @param message The message to apply.
    */
   @Override
-  @SuppressWarnings("unchecked")
   public void apply(JSONObject message) {
+    try {
+      if (!isInitialized()) {
+        assign(definition.getInit(), message, "init");
+        isInitialized = true;
+      }
 
-    if(!isInitialized()) {
-      assign(definition.getInit(), message, "init");
-      isInitialized = true;
-    }
+      assign(definition.getUpdate(), message, "update");
 
-    assign(definition.getUpdate(), message, "update");
+    } catch(Throwable e) {
+      LOG.error(format("Unable to apply message to profile: %s", 
e.getMessage()), e);
+    }
   }
 
   /**
@@ -140,33 +144,41 @@ public class DefaultProfileBuilder implements 
ProfileBuilder, Serializable {
    * @return Returns the completed profile measurement.
    */
   @Override
-  public ProfileMeasurement flush() {
+  public Optional<ProfileMeasurement> flush() {
     LOG.debug("Flushing profile: profile={}, entity={}", profileName, entity);
+    Optional<ProfileMeasurement> result = Optional.empty();
 
-    // execute the 'profile' expression(s)
-    @SuppressWarnings("unchecked")
-    Object profileValue = 
execute(definition.getResult().getProfileExpressions().getExpression(), 
"result/profile");
-
-    // execute the 'triage' expression(s)
-    Map<String, Object> triageValues = 
definition.getResult().getTriageExpressions().getExpressions()
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                    e -> e.getKey(),
-                    e -> execute(e.getValue(), "result/triage")));
-
-    // execute the 'groupBy' expression(s) - can refer to value of 'result' 
expression
-    List<Object> groups = execute(definition.getGroupBy(), 
ImmutableMap.of("result", profileValue), "groupBy");
+    try {
+      // execute the 'profile' expression(s)
+      Object profileValue = 
execute(definition.getResult().getProfileExpressions().getExpression(), 
"result/profile");
+
+      // execute the 'triage' expression(s)
+      Map<String, Object> triageValues = 
definition.getResult().getTriageExpressions().getExpressions()
+              .entrySet()
+              .stream()
+              .collect(Collectors.toMap(
+                      e -> e.getKey(),
+                      e -> execute(e.getValue(), "result/triage")));
+
+      // execute the 'groupBy' expression(s) - can refer to value of 'result' 
expression
+      List<Object> groups = execute(definition.getGroupBy(), 
ImmutableMap.of("result", profileValue), "groupBy");
+
+      result = Optional.of(new ProfileMeasurement()
+              .withProfileName(profileName)
+              .withEntity(entity)
+              .withGroups(groups)
+              .withPeriod(clock.currentTimeMillis(), periodDurationMillis, 
TimeUnit.MILLISECONDS)
+              .withProfileValue(profileValue)
+              .withTriageValues(triageValues)
+              .withDefinition(definition));
+
+    } catch(Throwable e) {
+      // if any of the Stellar expressions fail, a measurement should NOT be 
returned
+      LOG.error(format("Unable to flush profile: %s", e.getMessage()), e);
+    }
 
     isInitialized = false;
-    return new ProfileMeasurement()
-            .withProfileName(profileName)
-            .withEntity(entity)
-            .withGroups(groups)
-            .withPeriod(clock.currentTimeMillis(), periodDurationMillis, 
TimeUnit.MILLISECONDS)
-            .withProfileValue(profileValue)
-            .withTriageValues(triageValues)
-            .withDefinition(definition);
+    return result;
   }
 
   /**
@@ -233,7 +245,7 @@ public class DefaultProfileBuilder implements 
ProfileBuilder, Serializable {
     } catch(ParseException e) {
 
       // make it brilliantly clear that one of the 'update' expressions is bad
-      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", 
expressionType, e.getMessage(), profileName, entity);
+      String msg = format("Bad '%s' expression: error=%s, profile=%s, 
entity=%s", expressionType, e.getMessage(), profileName, entity);
       throw new ParseException(msg, e);
     }
   }
@@ -253,7 +265,8 @@ public class DefaultProfileBuilder implements 
ProfileBuilder, Serializable {
               .forEach((expr) -> results.add(executor.execute(expr, 
transientState, Object.class)));
 
     } catch (Throwable e) {
-      String msg = format("Bad '%s' expression: %s, profile=%s, entity=%s", 
expressionType, e.getMessage(), profileName, entity);
+      String msg = format("Bad '%s' expression: error=%s, profile=%s, 
entity=%s", expressionType, e.getMessage(), profileName, entity);
+      LOG.error(msg, e);
       throw new ParseException(msg, e);
     }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
index 9a5407b..d1fbc11 100644
--- 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/ProfileBuilder.java
@@ -23,6 +23,8 @@ package org.apache.metron.profiler;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.json.simple.JSONObject;
 
+import java.util.Optional;
+
 /**
  * Responsible for building and maintaining a Profile.
  *
@@ -49,7 +51,7 @@ public interface ProfileBuilder {
    *
    * @return Returns the completed profile measurement.
    */
-  ProfileMeasurement flush();
+  Optional<ProfileMeasurement> flush();
 
   /**
    * Has the ProfileBuilder been initialized?

http://git-wip-us.apache.org/repos/asf/metron/blob/38d41fe6/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
index fa5c19c..7c8664a 100644
--- 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultProfileBuilderTest.java
@@ -30,10 +30,13 @@ import org.json.simple.parser.JSONParser;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.metron.stellar.common.utils.ConversionUtils.convert;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Tests the ProfileBuilder class.
@@ -88,10 +91,11 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate that x = 100, y = 200
-    assertEquals(100 + 200, (int) convert(m.getProfileValue(), Integer.class));
+    assertEquals(100 + 200, (int) convert(m.get().getProfileValue(), 
Integer.class));
   }
 
   /**
@@ -110,10 +114,11 @@ public class DefaultProfileBuilderTest {
             .build();
 
     // execute
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate that x = 0 and y = 0 as no initialization occurred
-    assertEquals(0, (int) convert(m.getProfileValue(), Integer.class));
+    assertEquals(0, (int) convert(m.get().getProfileValue(), Integer.class));
   }
 
   /**
@@ -153,10 +158,11 @@ public class DefaultProfileBuilderTest {
     for(int i=0; i<count; i++) {
       builder.apply(message);
     }
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate that x=0, y=0 then x+=1, y+=2 for each message
-    assertEquals(count*1 + count*2, (int) convert(m.getProfileValue(), 
Integer.class));
+    assertEquals(count*1 + count*2, (int) convert(m.get().getProfileValue(), 
Integer.class));
   }
 
   /**
@@ -186,10 +192,11 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(100, (int) convert(m.getProfileValue(), Integer.class));
+    assertEquals(100, (int) convert(m.get().getProfileValue(), Integer.class));
   }
 
   /**
@@ -213,11 +220,12 @@ public class DefaultProfileBuilderTest {
     {
       // apply a message and flush
       builder.apply(message);
-      ProfileMeasurement m = builder.flush();
+      Optional<ProfileMeasurement> m = builder.flush();
+      assertTrue(m.isPresent());
 
       // validate the profile period
       ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 
10, TimeUnit.MINUTES);
-      assertEquals(expected, m.getPeriod());
+      assertEquals(expected, m.get().getPeriod());
     }
     {
       // advance time by at least one period - 10 minutes
@@ -225,11 +233,12 @@ public class DefaultProfileBuilderTest {
 
       // apply a message and flush again
       builder.apply(message);
-      ProfileMeasurement m = builder.flush();
+      Optional<ProfileMeasurement> m = builder.flush();
+      assertTrue(m.isPresent());
 
       // validate the profile period
       ProfilePeriod expected = new ProfilePeriod(clock.currentTimeMillis(), 
10, TimeUnit.MINUTES);
-      assertEquals(expected, m.getPeriod());
+      assertEquals(expected, m.get().getPeriod());
     }
   }
 
@@ -262,12 +271,13 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(2, m.getGroups().size());
-    assertEquals(100, m.getGroups().get(0));
-    assertEquals(200, m.getGroups().get(1));
+    assertEquals(2, m.get().getGroups().size());
+    assertEquals(100, m.get().getGroups().get(0));
+    assertEquals(200, m.get().getGroups().get(1));
   }
 
   /**
@@ -308,10 +318,11 @@ public class DefaultProfileBuilderTest {
 
     // apply another message to accumulate new state, then flush again to 
validate original state was cleared
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(33, m.getProfileValue());
+    assertEquals(33, m.get().getProfileValue());
   }
 
   /**
@@ -352,10 +363,11 @@ public class DefaultProfileBuilderTest {
 
     // apply another message to accumulate new state, then flush again to 
validate original state was cleared
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(3, m.getProfileValue());
+    assertEquals(3, m.get().getProfileValue());
   }
   /**
    * {
@@ -384,10 +396,11 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(entity, m.getEntity());
+    assertEquals(entity, m.get().getEntity());
   }
 
   /**
@@ -421,10 +434,11 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(100, m.getProfileValue());
+    assertEquals(100, m.get().getProfileValue());
   }
 
   /**
@@ -462,11 +476,202 @@ public class DefaultProfileBuilderTest {
 
     // execute
     builder.apply(message);
-    ProfileMeasurement m = builder.flush();
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
 
     // validate
-    assertEquals(0, m.getTriageValues().get("zero"));
-    assertEquals(100, m.getTriageValues().get("hundred"));
-    assertEquals(100, m.getProfileValue());
+    assertEquals(0, m.get().getTriageValues().get("zero"));
+    assertEquals(100, m.get().getTriageValues().get("hundred"));
+    assertEquals(100, m.get().getProfileValue());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-init",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "2 / 0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x + y",
+   *   "groupBy": ["cheese"]
+   * }
+   */
+  @Multiline
+  private String badInitProfile;
+
+  @Test
+  public void testBadInitExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badInitProfile, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // due to the bad expression, there should be no result
+    builder.apply(message);
+    assertFalse(builder.flush().isPresent());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-simple-result",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "2 / 0",
+   *   "groupBy": ["cheese"]
+   * }
+   */
+  @Multiline
+  private String badSimpleResultProfile;
+
+  @Test
+  public void testBadResultExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badSimpleResultProfile, 
ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // due to the bad expression, there should be no result
+    builder.apply(message);
+    assertFalse(builder.flush().isPresent());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-groupBy",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + 1" },
+   *   "result": "x",
+   *   "groupBy": ["2 / 0"]
+   * }
+   */
+  @Multiline
+  private String badGroupByProfile;
+
+  @Test
+  public void testBadGroupByExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badGroupByProfile, 
ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // due to the bad expression, there should be no result
+    builder.apply(message);
+    assertFalse(builder.flush().isPresent());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-result-profile",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "result": {
+   *      "profile": "2 / 0",
+   *      "triage": {
+   *        "zero": "x - 100",
+   *        "hundred": "x"
+   *      }
+   *   }
+   * }
+   */
+  @Multiline
+  private String badResultProfile;
+
+  @Test
+  public void testBadResultProfileExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badResultProfile, 
ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // due to the bad expression, there should be no result
+    builder.apply(message);
+    assertFalse(builder.flush().isPresent());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-result-triage",
+   *   "foreach": "ip_src_addr",
+   *   "init": { "x": "100" },
+   *   "result": {
+   *      "profile": "x",
+   *      "triage": {
+   *        "zero": "x - 100",
+   *        "hundred": "2 / 0"
+   *      }
+   *   }
+   * }
+   */
+  @Multiline
+  private String badResultTriage;
+
+  @Test
+  public void testBadResultTriageExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badResultTriage, ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // due to the bad expression, there should be no result
+    builder.apply(message);
+    assertFalse(builder.flush().isPresent());
+  }
+
+  /**
+   * {
+   *   "profile": "bad-update",
+   *   "foreach": "ip_src_addr",
+   *   "init":   { "x": "0" },
+   *   "update": { "x": "x + (2/0)" },
+   *   "result": "x"
+   * }
+   */
+  @Multiline
+  private String badUpdateProfile;
+
+  /**
+   * If the 'init' expression succeeds, but the 'update' fails, the profile 
should still flush.  We cannot
+   * be sure if the 'update' is failing on every message or just one.  Since 
that is the case, the profile
+   * flushes whatever data it has.
+   */
+  @Test
+  public void testBadUpdateExpression() throws Exception {
+    // setup
+    definition = JSONUtils.INSTANCE.load(badUpdateProfile, 
ProfileConfig.class);
+    builder = new DefaultProfileBuilder.Builder()
+            .withDefinition(definition)
+            .withEntity("10.0.0.1")
+            .withPeriodDuration(10, TimeUnit.MINUTES)
+            .withContext(Context.EMPTY_CONTEXT())
+            .build();
+
+    // execute
+    builder.apply(message);
+
+    // if the update expression fails, the profile should still flush.
+    Optional<ProfileMeasurement> m = builder.flush();
+    assertTrue(m.isPresent());
+    assertEquals(0, m.get().getProfileValue());
   }
 }

Reply via email to