This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 90582b3 METRON-1892 Parser Debugger Should Load Config From Zookeeper
(nickwallen) closes apache/metron#1278
90582b3 is described below
commit 90582b30902b9506d16547c0cdac16e8a7e3a66e
Author: nickwallen <[email protected]>
AuthorDate: Mon Dec 17 09:57:06 2018 -0500
METRON-1892 Parser Debugger Should Load Config From Zookeeper (nickwallen)
closes apache/metron#1278
---
.../org/apache/metron/management/Functions.java | 23 +++++-
.../apache/metron/management/ParserFunctions.java | 52 ++++++++++--
.../metron/management/StellarParserRunner.java | 7 +-
.../metron/management/ParserFunctionsTest.java | 95 +++++++++++++++++++++-
4 files changed, 166 insertions(+), 11 deletions(-)
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
index 5e591aa..e67f0ed 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
@@ -19,11 +19,16 @@
package org.apache.metron.management;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
import java.util.List;
+import java.util.Optional;
import static java.lang.String.format;
+import static
org.apache.metron.stellar.dsl.Context.Capabilities.ZOOKEEPER_CLIENT;
/**
* Contains utility functionality that is useful across all of the Stellar
management functions.
@@ -39,10 +44,10 @@ public class Functions {
* @param args All of the arguments.
* @param <T> The type of the argument expected.
*/
- public static <T> T getArg(String argName, int index, Class<T> clazz,
List<Object> args) {
+ public static <T> T getArg(String argName, int index, Class<T> clazz,
List<Object> args) throws ParseException {
if(index >= args.size()) {
String msg = format("missing '%s'; expected at least %d argument(s),
found %d", argName, index+1, args.size());
- throw new IllegalArgumentException(msg);
+ throw new ParseException(msg);
}
return ConversionUtils.convert(args.get(index), clazz);
@@ -69,4 +74,18 @@ public class Functions {
return result;
}
+
+ /**
+ * Retrieves the Zookeeper client from the execution context.
+ *
+ * @param context The execution context.
+ * @return A Zookeeper client, if one exists. Otherwise, an exception is
thrown.
+ */
+ public static CuratorFramework getZookeeperClient(Context context) throws
ParseException {
+ return context
+ .getCapability(ZOOKEEPER_CLIENT, false)
+ .filter(CuratorFramework.class::isInstance)
+ .map(CuratorFramework.class::cast)
+ .orElseThrow(() -> new ParseException("Missing ZOOKEEPER_CLIENT;
zookeeper connection required"));
+ }
}
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
index 6a02812..fcb91f9 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
@@ -19,7 +19,9 @@
package org.apache.metron.management;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.stellar.dsl.BaseStellarFunction;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
@@ -31,7 +33,9 @@ import java.util.List;
import java.util.Map;
import static java.lang.String.format;
+import static
org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper;
import static org.apache.metron.management.Functions.getArg;
+import static org.apache.metron.management.Functions.getZookeeperClient;
import static org.apache.metron.management.Functions.hasArg;
/**
@@ -49,25 +53,32 @@ public class ParserFunctions {
},
returns = "A parser that can be used to parse messages."
)
- public static class InitializeFunction extends BaseStellarFunction {
+ public static class InitializeFunction implements StellarFunction {
@Override
- public Object apply(List<Object> args) throws ParseException {
-
+ public Object apply(List<Object> args, Context context) throws
ParseException {
String sensorType = getArg("sensorType", 0, String.class, args);
StellarParserRunner parser = new StellarParserRunner(sensorType);
// handle the parser configuration argument
String configArgName = "config";
- if(hasArg(configArgName, 1, String.class, args)) {
+ if(args.size() == 1) {
+ // no config passed by user, attempt to retrieve from zookeeper
+ SensorParserConfig config = readFromZookeeper(context, sensorType);
+ parser.withParserConfiguration(sensorType, config);
+
+ } else if(hasArg(configArgName, 1, String.class, args)) {
// parser config passed in as a string
String arg = getArg(configArgName, 1, String.class, args);
parser.withParserConfiguration(arg);
- } else {
- // parser configuration passed in as a map
+ } else if(hasArg(configArgName, 1, Map.class, args)){
+ // parser config passed in as a map
Map<String, Object> arg = getArg(configArgName, 1, Map.class, args);
parser.withParserConfiguration(arg);
+
+ } else {
+ throw new ParseException(format("unexpected '%s' argument; expected
string or map", configArgName));
}
// handle the 'globals' argument which is optional
@@ -78,6 +89,33 @@ public class ParserFunctions {
return parser;
}
+
+ private SensorParserConfig readFromZookeeper(Context context, String
sensorType) throws ParseException {
+ SensorParserConfig config;
+ try {
+ CuratorFramework zkClient = getZookeeperClient(context);
+ config = readSensorParserConfigFromZookeeper(sensorType, zkClient);
+
+ } catch(Exception e) {
+ throw new ParseException(ExceptionUtils.getRootCauseMessage(e), e);
+ }
+
+ if(config == null) {
+ throw new ParseException("Unable to read configuration from Zookeeper;
sensorType = " + sensorType);
+ }
+
+ return config;
+ }
+
+ @Override
+ public void initialize(Context context) {
+ // nothing to do
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return true;
+ }
}
@Stellar(
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
index 38dba39..8884393 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
@@ -26,7 +26,6 @@ import org.apache.metron.stellar.dsl.Context;
import org.json.simple.JSONObject;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -111,6 +110,12 @@ public class StellarParserRunner {
return this;
}
+ public StellarParserRunner withParserConfiguration(String sensorType,
SensorParserConfig config) {
+ parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig(sensorType, config);
+ return this;
+ }
+
public StellarParserRunner withContext(Context context) {
this.context = context;
return this;
diff --git
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
index aed8b85..5510c67 100644
---
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
+++
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
@@ -20,15 +20,19 @@
package org.apache.metron.management;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +54,8 @@ import static
org.apache.metron.common.Constants.Fields.DST_ADDR;
import static org.apache.metron.common.Constants.Fields.DST_PORT;
import static org.apache.metron.common.Constants.Fields.SRC_ADDR;
import static org.apache.metron.common.Constants.Fields.SRC_PORT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests the {@link ParserFunctions} class.
@@ -217,6 +223,93 @@ public class ParserFunctionsTest {
Assert.assertNotNull(SensorParserConfig.fromBytes(config.getBytes()));
}
+ @Test
+ public void testInitFromString() throws Exception {
+ set("configAsString", broParserConfig);
+ StellarParserRunner runner = execute("PARSER_INIT('bro', configAsString)",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testInitFromMap() throws Exception {
+ Map<String, Object> configAsMap = (JSONObject) new
JSONParser().parse(broParserConfig);
+ set("configAsMap", configAsMap);
+ StellarParserRunner runner = execute("PARSER_INIT('bro', configAsMap)",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInitFromInvalidValue() throws Exception {
+ execute("PARSER_INIT('bro', 22)", StellarParserRunner.class);
+ Assert.fail("expected exception");
+ }
+
+ @Test
+ public void testInitFromZookeeper() throws Exception {
+ byte[] configAsBytes = broParserConfig.getBytes();
+ CuratorFramework zkClient =
zkClientForPath("/metron/topology/parsers/bro", configAsBytes);
+ context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () ->
zkClient);
+
+ StellarParserRunner runner = execute("PARSER_INIT('bro')",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInitMissingFromZookeeper() throws Exception {
+ // there is no config for 'bro' in zookeeper
+ CuratorFramework zkClient =
zkClientMissingPath("/metron/topology/parsers/bro");
+ context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () ->
zkClient);
+
+ execute("PARSER_INIT('bro')", StellarParserRunner.class);
+ Assert.fail("expected exception");
+ }
+
+ /**
+ * Create a mock Zookeeper client that returns a value for a given path.
+ *
+ * @param path The path within Zookeeper that will be requested.
+ * @param value The value to return when the path is requested.
+ * @return The mock Zookeeper client.
+ */
+ private CuratorFramework zkClientForPath(String path, byte[] value) throws
Exception {
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(path)).thenReturn(value);
+
+ CuratorFramework zkClient = mock(CuratorFramework.class);
+ when(zkClient.getData()).thenReturn(getDataBuilder);
+
+ return zkClient;
+ }
+
+ /**
+ * Create a mock Zookeeper client that will indicate the given path does not
exist.
+ *
+ * @param path The path that will 'not exist'.
+ * @return The mock Zookeeper client.
+ */
+ private CuratorFramework zkClientMissingPath(String path) throws Exception {
+
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(path)).thenThrow(new
KeeperException.NoNodeException(path));
+
+ CuratorFramework zkClient = mock(CuratorFramework.class);
+ when(zkClient.getData()).thenReturn(getDataBuilder);
+
+ return zkClient;
+ }
+
private boolean isError(JSONObject message) {
String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
return Constants.ERROR_TYPE.equals(sensorType);