Repository: metron
Updated Branches:
  refs/heads/master 575ba03b9 -> d599efb08


METRON-1874 Create a Parser Debugger (nickwallen) closes apache/metron#1265


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/d599efb0
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/d599efb0
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/d599efb0

Branch: refs/heads/master
Commit: d599efb08021a85317680ef52ad2e50405621647
Parents: 575ba03
Author: nickwallen <[email protected]>
Authored: Tue Nov 20 13:54:45 2018 -0500
Committer: nickallen <[email protected]>
Committed: Tue Nov 20 13:54:45 2018 -0500

----------------------------------------------------------------------
 .../org/apache/metron/management/Functions.java |  72 +++++
 .../metron/management/KafkaFunctions.java       |  19 +-
 .../metron/management/ParserFunctions.java      | 161 +++++++++++
 .../metron/management/StellarParserRunner.java  | 155 +++++++++++
 .../metron/management/ParserFunctionsTest.java  | 264 +++++++++++++++++++
 .../management/StellarParserRunnerTest.java     | 148 +++++++++++
 6 files changed, 801 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
new file mode 100644
index 0000000..5e591aa
--- /dev/null
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.management;
+
+import org.apache.metron.stellar.common.utils.ConversionUtils;
+
+import java.util.List;
+
+import static java.lang.String.format;
+
+/**
+ * Contains utility functionality that is useful across all of the Stellar 
management functions.
+ */
+public class Functions {
+
+  /**
+   * Get an argument from the Stellar function arguments
+   *
+   * @param argName The name of the argument.
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of the argument expected.
+   */
+  public static <T> T getArg(String argName, int index, Class<T> clazz, 
List<Object> args) {
+    if(index >= args.size()) {
+      String msg = format("missing '%s'; expected at least %d argument(s), 
found %d", argName, index+1, args.size());
+      throw new IllegalArgumentException(msg);
+    }
+
+    return ConversionUtils.convert(args.get(index), clazz);
+  }
+
+  /**
+   * Returns true if an argument of a specific type at a given index exists.  
Otherwise returns
+   * false if an argument does not exist at the index or is not the expected 
type.
+   *
+   * @param argName The name of the argument.
+   * @param index The index within the list of arguments.
+   * @param clazz The type expected.
+   * @param args All of the arguments.
+   * @param <T> The type of argument expected.
+   */
+  public static <T> boolean hasArg(String argName, int index, Class<T> clazz, 
List<Object> args) {
+    boolean result = false;
+
+    if(args.size() > index) {
+      if(clazz.isAssignableFrom(args.get(index).getClass())) {
+        return true;
+      }
+    }
+
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
index 76418b6..78026fb 100644
--- 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/KafkaFunctions.java
@@ -60,6 +60,7 @@ import java.util.concurrent.TimeoutException;
 
 import static java.lang.String.format;
 import static org.apache.metron.stellar.dsl.Context.Capabilities.GLOBAL_CONFIG;
+import static org.apache.metron.management.Functions.getArg;
 
 /**
  * Defines the following Kafka-related functions available in Stellar.
@@ -1054,22 +1055,4 @@ public class KafkaFunctions {
 
     return properties;
   }
-
-  /**
-   * Get an argument from a list of arguments.
-   *
-   * @param argName The name of the argument.
-   * @param index The index within the list of arguments.
-   * @param clazz The type expected.
-   * @param args All of the arguments.
-   * @param <T> The type of the argument expected.
-   */
-  public static <T> T getArg(String argName, int index, Class<T> clazz, 
List<Object> args) {
-    if(index >= args.size()) {
-      throw new IllegalArgumentException(format("missing '%s'; expected at 
least %d argument(s), found %d",
-              argName, index+1, args.size()));
-    }
-
-    return ConversionUtils.convert(args.get(index), clazz);
-  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
new file mode 100644
index 0000000..6a02812
--- /dev/null
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.management;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.metron.stellar.dsl.BaseStellarFunction;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
+import org.apache.metron.stellar.dsl.Stellar;
+import org.apache.metron.stellar.dsl.StellarFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static org.apache.metron.management.Functions.getArg;
+import static org.apache.metron.management.Functions.hasArg;
+
+/**
+ * Stellar functions that allow the user to parse messages in the Stellar REPL.
+ */
+public class ParserFunctions {
+
+  @Stellar(
+          namespace = "PARSER",
+          name = "INIT",
+          description = "Initialize a parser to parse messages.",
+          params = {
+                  "sensorType - The type of sensor to parse.",
+                  "config - The parser configuration."
+          },
+          returns = "A parser that can be used to parse messages."
+  )
+  public static class InitializeFunction extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) throws ParseException {
+
+      String sensorType = getArg("sensorType", 0, String.class, args);
+      StellarParserRunner parser = new StellarParserRunner(sensorType);
+
+      // handle the parser configuration argument
+      String configArgName = "config";
+      if(hasArg(configArgName, 1, String.class, args)) {
+        // parser config passed in as a string
+        String arg = getArg(configArgName, 1, String.class, args);
+        parser.withParserConfiguration(arg);
+
+      } else {
+        // parser configuration passed in as a map
+        Map<String, Object> arg = getArg(configArgName, 1, Map.class, args);
+        parser.withParserConfiguration(arg);
+      }
+
+      // handle the 'globals' argument which is optional
+      if(hasArg("globals", 1, Map.class, args)) {
+        Map<String, Object> globals = getArg("globals", 1, Map.class, args);
+        parser.withGlobals(globals);
+      }
+
+      return parser;
+    }
+  }
+
+  @Stellar(
+          namespace = "PARSER",
+          name = "PARSE",
+          description = "Parse a message.",
+          params = {
+                  "parser - The parser created with PARSER_INIT.",
+                  "input - A message or list of messages to parse."
+          },
+          returns = "A list of messages that result from parsing the input."
+  )
+  public static class ParseFunction implements StellarFunction {
+
+    @Override
+    public Object apply(List<Object> args, Context context) throws 
ParseException {
+      StellarParserRunner parser = getArg("parser", 0, 
StellarParserRunner.class, args);
+      parser.withContext(context);
+
+      List<String> messages = getMessages(args);
+      return parser.parse(messages);
+    }
+
+    /**
+     * Retrieves the messages that need parsed from the function arguments.
+     * @param args The function arguments.
+     * @return The list of messages to parse.
+     */
+    private List<String> getMessages(List<Object> args) {
+      String inputArgName = "input";
+
+      List<String> messages = new ArrayList<>();
+      if(hasArg(inputArgName, 1, String.class, args)) {
+        // the input is a single message as a string
+        String msg = getArg(inputArgName, 1, String.class, args);
+        messages.add(msg);
+
+      } else if(hasArg(inputArgName, 1, List.class, args)) {
+        // the input is a list of messages
+        List<Object> arg1 = getArg(inputArgName, 1, List.class, args);
+        for(Object object: arg1) {
+          String msg = String.class.cast(object);
+          messages.add(msg);
+        }
+
+      } else {
+        throw new IllegalArgumentException(format("Expected a string or list 
of strings to parse."));
+      }
+
+      return messages;
+    }
+
+    @Override
+    public void initialize(Context context) {
+      // nothing to do
+    }
+
+    @Override
+    public boolean isInitialized() {
+      return true;
+    }
+  }
+
+  @Stellar(
+          namespace = "PARSER",
+          name = "CONFIG",
+          description = "Returns the configuration of the parser",
+          params = {
+                  "parser - The parser created with PARSER_INIT."
+          },
+          returns = "The parser configuration."
+  )
+  public static class ConfigFunction extends BaseStellarFunction {
+
+    @Override
+    public Object apply(List<Object> args) {
+      StellarParserRunner parser = getArg("parser", 0, 
StellarParserRunner.class, args);
+      return parser.toJSON();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
new file mode 100644
index 0000000..38dba39
--- /dev/null
+++ 
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.management;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.parsers.ParserRunnerImpl;
+import org.apache.metron.parsers.ParserRunnerResults;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.emptyMap;
+import static 
org.apache.metron.common.message.metadata.RawMessageStrategies.DEFAULT;
+
+/**
+ * Enables parsing of messages in the Stellar REPL.
+ *
+ * <p>Maintains all of the state between subsequent executions of the parser 
functions.
+ */
+public class StellarParserRunner {
+
+    private String sensorType;
+    private ParserConfigurations parserConfigurations;
+    private Context context;
+    private int successCount;
+    private int errorCount;
+
+    /**
+     * @param sensorType The sensor type of the messages to parse.
+     */
+    public StellarParserRunner(String sensorType) {
+        this.sensorType = sensorType;
+        this.successCount = 0;
+        this.errorCount = 0;
+    }
+
+    public List<JSONObject> parse(List<String> messages) {
+        if(parserConfigurations == null) {
+            throw new IllegalArgumentException("Missing required parser 
configuration");
+        }
+        if(context == null) {
+            throw new IllegalArgumentException("Missing required context");
+        }
+        return doParse(messages);
+    }
+
+    private List<JSONObject> doParse(List<String> messages) {
+        // initialize
+        HashSet<String> sensorTypes = new HashSet<>();
+        sensorTypes.add(sensorType);
+        ParserRunnerImpl runner = new ParserRunnerImpl(sensorTypes);
+        runner.init(() -> parserConfigurations, context);
+
+        // parse each message
+        List<ParserRunnerResults<JSONObject>> results = messages
+                .stream()
+                .map(str -> str.getBytes())
+                .map(bytes -> DEFAULT.get(emptyMap(), bytes, false, 
emptyMap()))
+                .map(msg -> runner.execute(sensorType, msg, 
parserConfigurations))
+                .collect(Collectors.toList());
+
+        // aggregate both successes and errors into a list that can be returned
+        List<JSONObject> successes = results
+                .stream()
+                .flatMap(result -> result.getMessages().stream())
+                .collect(Collectors.toList());
+        successCount += successes.size();
+
+        List<JSONObject> errors = results
+                .stream()
+                .flatMap(result -> result.getErrors().stream())
+                .map(err -> err.getJSONObject())
+                .collect(Collectors.toList());
+        errorCount += errors.size();
+
+        // return a list of both successes and errors
+        successes.addAll(errors);
+        return successes;
+    }
+
+    public StellarParserRunner withParserConfiguration(String sensorConfig) {
+        parserConfigurations = create(sensorConfig.getBytes());
+        return this;
+    }
+
+    public StellarParserRunner withParserConfiguration(Map<String, Object> 
config) {
+        parserConfigurations = create(new 
JSONObject(config).toJSONString().getBytes());
+        return this;
+    }
+
+    public StellarParserRunner withContext(Context context) {
+        this.context = context;
+        return this;
+    }
+
+    public StellarParserRunner withGlobals(Map<String, Object> globals) {
+        parserConfigurations.updateGlobalConfig(globals);
+        return this;
+    }
+
+    /**
+     * @return The JSON configuration of the parser.
+     */
+    public String toJSON() {
+        try {
+            return 
parserConfigurations.getSensorParserConfig(sensorType).toJSON();
+        } catch(JsonProcessingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public ParserConfigurations getParserConfigurations() {
+        return parserConfigurations;
+    }
+
+    private ParserConfigurations create(byte[] sensorConfig) {
+        try {
+            ParserConfigurations result = new ParserConfigurations();
+            result.updateSensorParserConfig(sensorType, 
SensorParserConfig.fromBytes(sensorConfig));
+            return result;
+
+        } catch(IOException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    @Override
+    public String toString() {
+        // this is is displayed in the REPL; nothing useful to show
+        return String.format("Parser{%d successful, %d error(s)}", 
successCount, errorCount);
+    }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
new file mode 100644
index 0000000..aed8b85
--- /dev/null
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.metron.management;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.common.configuration.ParserConfigurations;
+import org.apache.metron.common.configuration.SensorParserConfig;
+import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
+import org.apache.metron.stellar.common.StellarStatefulExecutor;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
+import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.invoke.MethodHandles;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.metron.common.Constants.ErrorFields.ERROR_HASH;
+import static org.apache.metron.common.Constants.ErrorFields.ERROR_TYPE;
+import static org.apache.metron.common.Constants.ErrorFields.EXCEPTION;
+import static org.apache.metron.common.Constants.ErrorFields.MESSAGE;
+import static org.apache.metron.common.Constants.ErrorFields.STACK;
+import static org.apache.metron.common.Constants.Fields.DST_ADDR;
+import static org.apache.metron.common.Constants.Fields.DST_PORT;
+import static org.apache.metron.common.Constants.Fields.SRC_ADDR;
+import static org.apache.metron.common.Constants.Fields.SRC_PORT;
+
+/**
+ * Tests the {@link ParserFunctions} class.
+ */
+public class ParserFunctionsTest {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+  FunctionResolver functionResolver;
+  Map<String, Object> variables;
+  Context context = null;
+  StellarStatefulExecutor executor;
+
+  @Before
+  public void setup() {
+    variables = new HashMap<>();
+    functionResolver = new SimpleFunctionResolver()
+            .withClass(ParserFunctions.ParseFunction.class)
+            .withClass(ParserFunctions.InitializeFunction.class)
+            .withClass(ParserFunctions.ConfigFunction.class);
+    context = new Context.Builder().build();
+    executor = new DefaultStellarStatefulExecutor(functionResolver, context);
+  }
+
+  /**
+   * {
+   *  "dns": {
+   *  "ts":1402308259.609,
+   *  "uid":"CuJT272SKaJSuqO0Ia",
+   *  "id.orig_h":"10.122.196.204",
+   *  "id.orig_p":33976,
+   *  "id.resp_h":"144.254.71.184",
+   *  "id.resp_p":53,
+   *  "proto":"udp",
+   *  "trans_id":62418,
+   *  "query":"www.cisco.com",
+   *  "qclass":1,
+   *  "qclass_name":"C_INTERNET",
+   *  "qtype":28,
+   *  "qtype_name":"AAAA",
+   *  "rcode":0,
+   *  "rcode_name":"NOERROR",
+   *  "AA":true,
+   *  "TC":false,
+   *  "RD":true,
+   *  "RA":true,
+   *  "Z":0,
+   *  
"answers":["www.cisco.com.akadns.net","origin-www.cisco.com","2001:420:1201:2::a"],
+   *  "TTLs":[3600.0,289.0,14.0],
+   *  "rejected":false
+   *  }
+   * }
+   */
+  @Multiline
+  public String broMessage;
+
+  /**
+   * {
+   *  "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+   *  "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
+   *  "sensorTopic":"bro"
+   * }
+   */
+  @Multiline
+  private String broParserConfig;
+
+  @Test
+  public void testParseBroMessage() {
+    // initialize the parser with the sensor config
+    set("config", broParserConfig);
+    assign("parser", "PARSER_INIT('bro', config)");
+
+    // parse the message
+    set("message", broMessage);
+    List<JSONObject> messages = execute("PARSER_PARSE(parser, message)", 
List.class);
+
+    // validate the parsed message
+    Assert.assertEquals(1, messages.size());
+    JSONObject message = messages.get(0);
+    Assert.assertEquals("bro", message.get(Constants.SENSOR_TYPE));
+    Assert.assertEquals("10.122.196.204", message.get(SRC_ADDR.getName()));
+    Assert.assertEquals(33976L, message.get(SRC_PORT.getName()));
+    Assert.assertEquals("144.254.71.184", message.get(DST_ADDR.getName()));
+    Assert.assertEquals(53L, message.get(DST_PORT.getName()));
+    Assert.assertEquals("dns", message.get("protocol"));
+  }
+
+  @Test
+  public void testParseMultipleMessages() {
+    // initialize the parser with the sensor config
+    set("config", broParserConfig);
+    assign("parser", "PARSER_INIT('bro', config)");
+
+    // parse the message
+    set("msg1", broMessage);
+    set("msg2", broMessage);
+    set("msg3", broMessage);
+    List<JSONObject> messages = execute("PARSER_PARSE(parser, [msg1, msg2, 
msg3])", List.class);
+
+    // expect a parsed message
+    Assert.assertEquals(3, messages.size());
+    for(JSONObject message: messages) {
+      Assert.assertEquals("bro", message.get(Constants.SENSOR_TYPE));
+      Assert.assertTrue(message.containsKey(Constants.GUID));
+      Assert.assertEquals("10.122.196.204", message.get(SRC_ADDR.getName()));
+      Assert.assertEquals(33976L, message.get(SRC_PORT.getName()));
+      Assert.assertEquals("144.254.71.184", message.get(DST_ADDR.getName()));
+      Assert.assertEquals(53L, message.get(DST_PORT.getName()));
+      Assert.assertEquals("dns", message.get("protocol"));
+    }
+  }
+
+  @Test
+  public void testParseInvalidMessage() {
+    // initialize the parser with the sensor config
+    set("config", broParserConfig);
+    assign("parser", "PARSER_INIT('bro', config)");
+
+    // parse the message
+    String invalidMessage = "{ this is an invalid message }}";
+    set("message", invalidMessage);
+    List<JSONObject> messages = execute("PARSER_PARSE(parser, message)", 
List.class);
+
+    // validate the parsed message
+    Assert.assertEquals(1, messages.size());
+
+    // expect an error message to be returned
+    JSONObject error = messages.get(0);
+    Assert.assertEquals(invalidMessage, error.get("raw_message"));
+    Assert.assertEquals(Constants.ERROR_TYPE, 
error.get(Constants.SENSOR_TYPE));
+    Assert.assertEquals("parser_error", error.get(ERROR_TYPE.getName()));
+    Assert.assertTrue(error.containsKey(MESSAGE.getName()));
+    Assert.assertTrue(error.containsKey(EXCEPTION.getName()));
+    Assert.assertTrue(error.containsKey(STACK.getName()));
+    Assert.assertTrue(error.containsKey(ERROR_HASH.getName()));
+    Assert.assertTrue(error.containsKey(Constants.GUID));
+  }
+
+  @Test
+  public void testParseSomeGoodSomeBadMessages() {
+    // initialize the parser with the sensor config
+    set("config", broParserConfig);
+    assign("parser", "PARSER_INIT('bro', config)");
+
+    // parse the message
+    String invalidMessage = "{ this is an invalid message }}";
+    set("msg1", broMessage);
+    set("msg2", invalidMessage);
+    List<JSONObject> messages = execute("PARSER_PARSE(parser, [msg1, msg2])", 
List.class);
+
+    // expect 2 messages to be returned - 1 success and 1 error
+    Assert.assertEquals(2, messages.size());
+    Assert.assertEquals(1, messages.stream().filter(msg -> 
isBro(msg)).count());
+    Assert.assertEquals(1, messages.stream().filter(msg -> 
isError(msg)).count());
+  }
+
+  @Test
+  public void testConfig() throws Exception {
+    // initialize the parser
+    set("config", broParserConfig);
+    assign("parser", "PARSER_INIT('bro', config)");
+
+    String config = execute("PARSER_CONFIG(parser)", String.class);
+    Assert.assertNotNull(config);
+    Assert.assertNotNull(SensorParserConfig.fromBytes(config.getBytes()));
+  }
+
+  private boolean isError(JSONObject message) {
+    String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
+    return Constants.ERROR_TYPE.equals(sensorType);
+  }
+
+  private boolean isBro(JSONObject message) {
+    String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
+    return "bro".equals(sensorType);
+  }
+
+  /**
+   * Set the value of a variable.
+   *
+   * @param var The variable to assign.
+   * @param value The value to assign.
+   */
+  private void set(String var, Object value) {
+    executor.assign(var, value);
+  }
+
+  /**
+   * Assign a value to the result of an expression.
+   *
+   * @param var The variable to assign.
+   * @param expression The expression to execute.
+   */
+  private Object assign(String var, String expression) {
+    executor.assign(var, expression, Collections.emptyMap());
+    return executor.getState().get(var);
+  }
+
+  /**
+   * Execute a Stellar expression.
+   *
+   * @param expression The Stellar expression to execute.
+   * @param clazz
+   * @param <T>
+   * @return The result of executing the Stellar expression.
+   */
+  private <T> T execute(String expression, Class<T> clazz) {
+    T results = executor.execute(expression, Collections.emptyMap(), clazz);
+    LOG.debug("{} = {}", expression, results);
+    return results;
+  }
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/d599efb0/metron-platform/metron-management/src/test/java/org/apache/metron/management/StellarParserRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/StellarParserRunnerTest.java
 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/StellarParserRunnerTest.java
new file mode 100644
index 0000000..07168b7
--- /dev/null
+++ 
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/StellarParserRunnerTest.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.management;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.metron.common.Constants;
+import org.apache.metron.stellar.dsl.Context;
+import org.json.simple.JSONObject;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.metron.common.Constants.ErrorFields.ERROR_HASH;
+import static org.apache.metron.common.Constants.ErrorFields.ERROR_TYPE;
+import static org.apache.metron.common.Constants.ErrorFields.EXCEPTION;
+import static org.apache.metron.common.Constants.ErrorFields.MESSAGE;
+import static org.apache.metron.common.Constants.ErrorFields.STACK;
+import static org.apache.metron.common.Constants.Fields.DST_ADDR;
+import static org.apache.metron.common.Constants.Fields.DST_PORT;
+import static org.apache.metron.common.Constants.Fields.SRC_ADDR;
+import static org.apache.metron.common.Constants.Fields.SRC_PORT;
+
+public class StellarParserRunnerTest {
+
+    /**
+     * {
+     *  "dns": {
+     *  "ts":1402308259.609,
+     *  "uid":"CuJT272SKaJSuqO0Ia",
+     *  "id.orig_h":"10.122.196.204",
+     *  "id.orig_p":33976,
+     *  "id.resp_h":"144.254.71.184",
+     *  "id.resp_p":53,
+     *  "proto":"udp",
+     *  "trans_id":62418,
+     *  "query":"www.cisco.com",
+     *  "qclass":1,
+     *  "qclass_name":"C_INTERNET",
+     *  "qtype":28,
+     *  "qtype_name":"AAAA",
+     *  "rcode":0,
+     *  "rcode_name":"NOERROR",
+     *  "AA":true,
+     *  "TC":false,
+     *  "RD":true,
+     *  "RA":true,
+     *  "Z":0,
+     *  
"answers":["www.cisco.com.akadns.net","origin-www.cisco.com","2001:420:1201:2::a"],
+     *  "TTLs":[3600.0,289.0,14.0],
+     *  "rejected":false
+     *  }
+     * }
+     */
+    @Multiline
+    public String broMessage;
+
+    /**
+     * {
+     *  "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser",
+     *  "filterClassName":"org.apache.metron.parsers.filters.StellarFilter",
+     *  "sensorTopic":"bro"
+     * }
+     */
+    @Multiline
+    private String broParserConfig;
+
+    @Test
+    public void testParseMessage() {
+        List<String> toParse = new ArrayList<>();
+        toParse.add(broMessage);
+        toParse.add(broMessage);
+        toParse.add(broMessage);
+
+        // parse the messages
+        StellarParserRunner runner = new StellarParserRunner("bro")
+                .withParserConfiguration(broParserConfig)
+                .withContext(Context.EMPTY_CONTEXT());
+        List<JSONObject> messages = runner.parse(toParse);
+
+        // expect 3 successfully parsed message
+        Assert.assertEquals(3, messages.size());
+        for(JSONObject message: messages) {
+            Assert.assertEquals("bro", message.get(Constants.SENSOR_TYPE));
+            Assert.assertTrue(message.containsKey(Constants.GUID));
+            Assert.assertEquals("10.122.196.204", 
message.get(SRC_ADDR.getName()));
+            Assert.assertEquals(33976L, message.get(SRC_PORT.getName()));
+            Assert.assertEquals("144.254.71.184", 
message.get(DST_ADDR.getName()));
+            Assert.assertEquals(53L, message.get(DST_PORT.getName()));
+            Assert.assertEquals("dns", message.get("protocol"));
+        }
+    }
+
+    @Test
+    public void testParseInvalidMessage() {
+        List<String> toParse = new ArrayList<>();
+        toParse.add("{DAS}");
+
+        // parse the messages
+        StellarParserRunner runner = new StellarParserRunner("bro")
+                .withParserConfiguration(broParserConfig)
+                .withContext(Context.EMPTY_CONTEXT());
+        List<JSONObject> messages = runner.parse(toParse);
+
+        // expect an error message to be returned
+        JSONObject error = messages.get(0);
+        Assert.assertEquals(toParse.get(0), error.get("raw_message"));
+        Assert.assertEquals(Constants.ERROR_TYPE, 
error.get(Constants.SENSOR_TYPE));
+        Assert.assertEquals("parser_error", error.get(ERROR_TYPE.getName()));
+        Assert.assertTrue(error.containsKey(MESSAGE.getName()));
+        Assert.assertTrue(error.containsKey(EXCEPTION.getName()));
+        Assert.assertTrue(error.containsKey(STACK.getName()));
+        Assert.assertTrue(error.containsKey(ERROR_HASH.getName()));
+        Assert.assertTrue(error.containsKey(Constants.GUID));
+    }
+
+    @Test
+    public void testToString() {
+        List<String> toParse = new ArrayList<>();
+        toParse.add(broMessage);
+        toParse.add("{DAS}");
+
+        // parse the messages
+        StellarParserRunner runner = new StellarParserRunner("bro")
+                .withParserConfiguration(broParserConfig)
+                .withContext(Context.EMPTY_CONTEXT());
+        List<JSONObject> messages = runner.parse(toParse);
+
+        // toString() should tally the number of successes and failures
+        Assert.assertEquals("Parser{1 successful, 1 error(s)}", 
runner.toString());
+    }
+}

Reply via email to