asfgit closed pull request #1278: METRON-1892 Parser Debugger Should Load
Config From Zookeeper
URL: https://github.com/apache/metron/pull/1278
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
index 5e591aa600..e67f0ed663 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/Functions.java
@@ -19,11 +19,16 @@
package org.apache.metron.management;
+import org.apache.curator.framework.CuratorFramework;
import org.apache.metron.stellar.common.utils.ConversionUtils;
+import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
import java.util.List;
+import java.util.Optional;
import static java.lang.String.format;
+import static
org.apache.metron.stellar.dsl.Context.Capabilities.ZOOKEEPER_CLIENT;
/**
* Contains utility functionality that is useful across all of the Stellar
management functions.
@@ -39,10 +44,10 @@
* @param args All of the arguments.
* @param <T> The type of the argument expected.
*/
- public static <T> T getArg(String argName, int index, Class<T> clazz,
List<Object> args) {
+ public static <T> T getArg(String argName, int index, Class<T> clazz,
List<Object> args) throws ParseException {
if(index >= args.size()) {
String msg = format("missing '%s'; expected at least %d argument(s),
found %d", argName, index+1, args.size());
- throw new IllegalArgumentException(msg);
+ throw new ParseException(msg);
}
return ConversionUtils.convert(args.get(index), clazz);
@@ -69,4 +74,18 @@
return result;
}
+
+ /**
+ * Retrieves the Zookeeper client from the execution context.
+ *
+ * @param context The execution context.
+ * @return A Zookeeper client, if one exists. Otherwise, an exception is
thrown.
+ */
+ public static CuratorFramework getZookeeperClient(Context context) throws
ParseException {
+ return context
+ .getCapability(ZOOKEEPER_CLIENT, false)
+ .filter(CuratorFramework.class::isInstance)
+ .map(CuratorFramework.class::cast)
+ .orElseThrow(() -> new ParseException("Missing ZOOKEEPER_CLIENT;
zookeeper connection required"));
+ }
}
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
index 6a028125b2..fcb91f9ad8 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ParserFunctions.java
@@ -19,7 +19,9 @@
package org.apache.metron.management;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.stellar.dsl.BaseStellarFunction;
import org.apache.metron.stellar.dsl.Context;
import org.apache.metron.stellar.dsl.ParseException;
@@ -31,7 +33,9 @@
import java.util.Map;
import static java.lang.String.format;
+import static
org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper;
import static org.apache.metron.management.Functions.getArg;
+import static org.apache.metron.management.Functions.getZookeeperClient;
import static org.apache.metron.management.Functions.hasArg;
/**
@@ -49,25 +53,32 @@
},
returns = "A parser that can be used to parse messages."
)
- public static class InitializeFunction extends BaseStellarFunction {
+ public static class InitializeFunction implements StellarFunction {
@Override
- public Object apply(List<Object> args) throws ParseException {
-
+ public Object apply(List<Object> args, Context context) throws
ParseException {
String sensorType = getArg("sensorType", 0, String.class, args);
StellarParserRunner parser = new StellarParserRunner(sensorType);
// handle the parser configuration argument
String configArgName = "config";
- if(hasArg(configArgName, 1, String.class, args)) {
+ if(args.size() == 1) {
+ // no config passed by user, attempt to retrieve from zookeeper
+ SensorParserConfig config = readFromZookeeper(context, sensorType);
+ parser.withParserConfiguration(sensorType, config);
+
+ } else if(hasArg(configArgName, 1, String.class, args)) {
// parser config passed in as a string
String arg = getArg(configArgName, 1, String.class, args);
parser.withParserConfiguration(arg);
- } else {
- // parser configuration passed in as a map
+ } else if(hasArg(configArgName, 1, Map.class, args)){
+ // parser config passed in as a map
Map<String, Object> arg = getArg(configArgName, 1, Map.class, args);
parser.withParserConfiguration(arg);
+
+ } else {
+ throw new ParseException(format("unexpected '%s' argument; expected
string or map", configArgName));
}
// handle the 'globals' argument which is optional
@@ -78,6 +89,33 @@ public Object apply(List<Object> args) throws ParseException
{
return parser;
}
+
+ private SensorParserConfig readFromZookeeper(Context context, String
sensorType) throws ParseException {
+ SensorParserConfig config;
+ try {
+ CuratorFramework zkClient = getZookeeperClient(context);
+ config = readSensorParserConfigFromZookeeper(sensorType, zkClient);
+
+ } catch(Exception e) {
+ throw new ParseException(ExceptionUtils.getRootCauseMessage(e), e);
+ }
+
+ if(config == null) {
+ throw new ParseException("Unable to read configuration from Zookeeper;
sensorType = " + sensorType);
+ }
+
+ return config;
+ }
+
+ @Override
+ public void initialize(Context context) {
+ // nothing to do
+ }
+
+ @Override
+ public boolean isInitialized() {
+ return true;
+ }
}
@Stellar(
diff --git
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
index 38dba39109..88843932d1 100644
---
a/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
+++
b/metron-platform/metron-management/src/main/java/org/apache/metron/management/StellarParserRunner.java
@@ -26,7 +26,6 @@
import org.json.simple.JSONObject;
import java.io.IOException;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -111,6 +110,12 @@ public StellarParserRunner
withParserConfiguration(Map<String, Object> config) {
return this;
}
+ public StellarParserRunner withParserConfiguration(String sensorType,
SensorParserConfig config) {
+ parserConfigurations = new ParserConfigurations();
+ parserConfigurations.updateSensorParserConfig(sensorType, config);
+ return this;
+ }
+
public StellarParserRunner withContext(Context context) {
this.context = context;
return this;
diff --git
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
index aed8b858af..5510c67405 100644
---
a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
+++
b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ParserFunctionsTest.java
@@ -20,15 +20,19 @@
package org.apache.metron.management;
import org.adrianwalker.multilinestring.Multiline;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.metron.common.Constants;
-import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.stellar.common.DefaultStellarStatefulExecutor;
import org.apache.metron.stellar.common.StellarStatefulExecutor;
import org.apache.metron.stellar.dsl.Context;
+import org.apache.metron.stellar.dsl.ParseException;
import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver;
import org.apache.metron.stellar.dsl.functions.resolver.SimpleFunctionResolver;
+import org.apache.zookeeper.KeeperException;
import org.json.simple.JSONObject;
+import org.json.simple.parser.JSONParser;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -50,6 +54,8 @@
import static org.apache.metron.common.Constants.Fields.DST_PORT;
import static org.apache.metron.common.Constants.Fields.SRC_ADDR;
import static org.apache.metron.common.Constants.Fields.SRC_PORT;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Tests the {@link ParserFunctions} class.
@@ -217,6 +223,93 @@ public void testConfig() throws Exception {
Assert.assertNotNull(SensorParserConfig.fromBytes(config.getBytes()));
}
+ @Test
+ public void testInitFromString() throws Exception {
+ set("configAsString", broParserConfig);
+ StellarParserRunner runner = execute("PARSER_INIT('bro', configAsString)",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testInitFromMap() throws Exception {
+ Map<String, Object> configAsMap = (JSONObject) new
JSONParser().parse(broParserConfig);
+ set("configAsMap", configAsMap);
+ StellarParserRunner runner = execute("PARSER_INIT('bro', configAsMap)",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInitFromInvalidValue() throws Exception {
+ execute("PARSER_INIT('bro', 22)", StellarParserRunner.class);
+ Assert.fail("expected exception");
+ }
+
+ @Test
+ public void testInitFromZookeeper() throws Exception {
+ byte[] configAsBytes = broParserConfig.getBytes();
+ CuratorFramework zkClient =
zkClientForPath("/metron/topology/parsers/bro", configAsBytes);
+ context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () ->
zkClient);
+
+ StellarParserRunner runner = execute("PARSER_INIT('bro')",
StellarParserRunner.class);
+
+ Assert.assertNotNull(runner);
+ SensorParserConfig actual =
runner.getParserConfigurations().getSensorParserConfig("bro");
+ SensorParserConfig expected =
SensorParserConfig.fromBytes(broParserConfig.getBytes());
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test(expected = ParseException.class)
+ public void testInitMissingFromZookeeper() throws Exception {
+ // there is no config for 'bro' in zookeeper
+ CuratorFramework zkClient =
zkClientMissingPath("/metron/topology/parsers/bro");
+ context.addCapability(Context.Capabilities.ZOOKEEPER_CLIENT, () ->
zkClient);
+
+ execute("PARSER_INIT('bro')", StellarParserRunner.class);
+ Assert.fail("expected exception");
+ }
+
+ /**
+ * Create a mock Zookeeper client that returns a value for a given path.
+ *
+ * @param path The path within Zookeeper that will be requested.
+ * @param value The value to return when the path is requested.
+ * @return The mock Zookeeper client.
+ */
+ private CuratorFramework zkClientForPath(String path, byte[] value) throws
Exception {
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(path)).thenReturn(value);
+
+ CuratorFramework zkClient = mock(CuratorFramework.class);
+ when(zkClient.getData()).thenReturn(getDataBuilder);
+
+ return zkClient;
+ }
+
+ /**
+ * Create a mock Zookeeper client that will indicate the given path does not
exist.
+ *
+ * @param path The path that will 'not exist'.
+ * @return The mock Zookeeper client.
+ */
+ private CuratorFramework zkClientMissingPath(String path) throws Exception {
+
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ when(getDataBuilder.forPath(path)).thenThrow(new
KeeperException.NoNodeException(path));
+
+ CuratorFramework zkClient = mock(CuratorFramework.class);
+ when(zkClient.getData()).thenReturn(getDataBuilder);
+
+ return zkClient;
+ }
+
private boolean isError(JSONObject message) {
String sensorType = String.class.cast(message.get(Constants.SENSOR_TYPE));
return Constants.ERROR_TYPE.equals(sensorType);
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services