METRON-1529 CONFIG_GET Fails to Retrieve Latest Config When Run in Zeppelin REPL (nickwallen) closes apache/metron#997
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/37e3fd32 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/37e3fd32 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/37e3fd32 Branch: refs/heads/feature/METRON-1090-stellar-assignment Commit: 37e3fd32c256ddc129eb7c1363d78e9095a39748 Parents: b5bf9a9 Author: nickwallen <[email protected]> Authored: Wed Apr 25 09:27:18 2018 -0400 Committer: nickallen <[email protected]> Committed: Wed Apr 25 09:27:18 2018 -0400 ---------------------------------------------------------------------- .../configuration/ConfigurationsUtils.java | 123 +++- .../management/ConfigurationFunctions.java | 564 ++++++++++--------- .../management/ConfigurationFunctionsTest.java | 424 ++++++++++---- .../shell/DefaultStellarShellExecutor.java | 4 +- .../common/utils/StellarProcessorUtils.java | 135 +++-- 5 files changed, 825 insertions(+), 425 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java index a89db63..c7b39f0 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; +import java.io.InputStream; import java.io.PrintStream; import java.lang.invoke.MethodHandles; import java.nio.file.Files; @@ -45,6 +46,7 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; @@ -235,12 +237,99 @@ public class ConfigurationsUtils { ); } + /** + * Reads the global configuration stored in Zookeeper. + * + * @param client The Zookeeper client. + * @return The global configuration, if one exists. Otherwise, null. + * @throws Exception + */ + public static Map<String, Object> readGlobalConfigFromZookeeper(CuratorFramework client) throws Exception { + Map<String, Object> config = null; + + Optional<byte[]> bytes = readFromZookeeperSafely(GLOBAL.getZookeeperRoot(), client); + if(bytes.isPresent()) { + InputStream in = new ByteArrayInputStream(bytes.get()); + config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER); + } + + return config; + } + + /** + * Reads the Indexing configuration from Zookeeper. + * + * @param sensorType The type of sensor. + * @param client The Zookeeper client. + * @return The indexing configuration for the given sensor type, if one exists. Otherwise, null. + * @throws Exception + */ + public static Map<String, Object> readSensorIndexingConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception { + Map<String, Object> config = null; + + Optional<byte[]> bytes = readFromZookeeperSafely(INDEXING.getZookeeperRoot() + "/" + sensorType, client); + if(bytes.isPresent()) { + InputStream in = new ByteArrayInputStream(bytes.get()); + config = JSONUtils.INSTANCE.load(in, JSONUtils.MAP_SUPPLIER); + } + + return config; + } + + /** + * Reads the Enrichment configuration from Zookeeper. + * + * @param sensorType The type of sensor. + * @param client The Zookeeper client. + * @return The Enrichment configuration for the given sensor type, if one exists. Otherwise, null. + * @throws Exception + */ public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception { - return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client)), SensorEnrichmentConfig.class); + SensorEnrichmentConfig config = null; + + Optional<byte[]> bytes = readFromZookeeperSafely(ENRICHMENT.getZookeeperRoot() + "/" + sensorType, client); + if (bytes.isPresent()) { + config = SensorEnrichmentConfig.fromBytes(bytes.get()); + } + + return config; } + /** + * Reads the Parser configuration from Zookeeper. + * + * @param sensorType The type of sensor. + * @param client The Zookeeper client. + * @return The Parser configuration for the given sensor type, if one exists. Otherwise, null. + * @throws Exception + */ public static SensorParserConfig readSensorParserConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception { - return JSONUtils.INSTANCE.load(new ByteArrayInputStream(readFromZookeeper(PARSER.getZookeeperRoot() + "/" + sensorType, client)), SensorParserConfig.class); + SensorParserConfig config = null; + + Optional<byte[]> bytes = readFromZookeeperSafely(PARSER.getZookeeperRoot() + "/" + sensorType, client); + if(bytes.isPresent()) { + config = SensorParserConfig.fromBytes(bytes.get()); + } + + return config; + } + + /** + * Reads the Profiler configuration from Zookeeper. + * + * @param client The Zookeeper client. + * @return THe Profiler configuration. + * @throws Exception + */ + public static ProfilerConfig readProfilerConfigFromZookeeper(CuratorFramework client) throws Exception { + ProfilerConfig config = null; + + Optional<byte[]> bytes = readFromZookeeperSafely(PROFILER.getZookeeperRoot(), client); + if(bytes.isPresent()) { + config = ProfilerConfig.fromBytes(bytes.get()); + } + + return config; } public static byte[] readGlobalConfigBytesFromZookeeper(CuratorFramework client) throws Exception { @@ -289,6 +378,36 @@ public class ConfigurationsUtils { } } + /** + * Read raw bytes from Zookeeper. + * + * @param path The path to the Zookeeper node to read. + * @param client The Zookeeper client. + * @return The bytes read from Zookeeper, if node exists. Otherwise, null. + * @throws Exception + */ + public static Optional<byte[]> readFromZookeeperSafely(String path, CuratorFramework client) throws Exception { + Optional<byte[]> result = Optional.empty(); + + try { + byte[] bytes = readFromZookeeper(path, client); + result = Optional.of(bytes); + + } catch(KeeperException.NoNodeException e) { + LOG.debug("Zookeeper node missing; path={}", e); + } + + return result; + } + + /** + * Read raw bytes from Zookeeper. + * + * @param path The path to the Zookeeper node to read. + * @param client The Zookeeper client. + * @return The bytes read from Zookeeper. + * @throws Exception If the path does not exist in Zookeeper. + */ public static byte[] readFromZookeeper(String path, CuratorFramework client) throws Exception { if (client != null && client.getData() != null && path != null) { return client.getData().forPath(path); http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java index af90e14..5a1281c 100644 --- a/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java +++ b/metron-platform/metron-management/src/main/java/org/apache/metron/management/ConfigurationFunctions.java @@ -18,26 +18,17 @@ package org.apache.metron.management; import com.fasterxml.jackson.core.JsonProcessingException; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import java.lang.invoke.MethodHandles; -import java.util.Collections; -import java.util.EnumMap; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; -import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationType; -import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.ParseException; @@ -46,203 +37,280 @@ import org.apache.metron.stellar.dsl.StellarFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayInputStream; +import java.lang.invoke.MethodHandles; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static java.lang.String.format; +import static org.apache.metron.common.configuration.ConfigurationType.ENRICHMENT; +import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL; +import static org.apache.metron.common.configuration.ConfigurationType.INDEXING; +import static org.apache.metron.common.configuration.ConfigurationType.PARSER; +import static org.apache.metron.common.configuration.ConfigurationType.PROFILER; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigBytesFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readGlobalConfigFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigBytesFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readProfilerConfigFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorIndexingConfigFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.readSensorParserConfigFromZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeGlobalConfigToZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorIndexingConfigToZookeeper; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeSensorParserConfigToZookeeper; + +/** + * Defines functions that enable modification of Metron configuration values. + */ public class ConfigurationFunctions { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - private static EnumMap<ConfigurationType, Object> configMap = new EnumMap<ConfigurationType, Object>(ConfigurationType.class) {{ - for(ConfigurationType ct : ConfigurationType.values()) { - put(ct, Collections.synchronizedMap(new HashMap<String, String>())); - } - put(ConfigurationType.GLOBAL, ""); - put(ConfigurationType.PROFILER, ""); - }}; - private static synchronized void setupTreeCache(Context context) throws Exception { - try { - Optional<Object> treeCacheOpt = context.getCapability("treeCache"); - if (treeCacheOpt.isPresent()) { - return; - } + + + /** + * Retrieves the Zookeeper client from the execution context. + * + * @param context The execution context. + * @return A Zookeeper client, if one exists. Otherwise, an exception is thrown. + */ + private static CuratorFramework getZookeeperClient(Context context) { + + Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT, true); + if(clientOpt.isPresent()) { + return (CuratorFramework) clientOpt.get(); + + } else { + throw new IllegalStateException("Missing ZOOKEEPER_CLIENT; zookeeper connection required"); } - catch(IllegalStateException ex) { + } + /** + * Get an argument from a list of arguments. + * + * @param index The index within the list of arguments. + * @param clazz The type expected. + * @param args All of the arguments. + * @param <T> The type of the argument expected. + */ + public static <T> T getArg(int index, Class<T> clazz, List<Object> args) { + + if(index >= args.size()) { + throw new IllegalArgumentException(format("expected at least %d argument(s), found %d", index+1, args.size())); } - Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); - if(!clientOpt.isPresent()) { - throw new IllegalStateException("I expected a zookeeper client to exist and it did not. Please connect to zookeeper."); + + return ConversionUtils.convert(args.get(index), clazz); + } + + /** + * Serializes a configuration object to the raw JSON. + * + * @param object The configuration object to serialize + * @return + */ + private static String toJSON(Object object) { + + if(object == null) { + return null; } - CuratorFramework client = (CuratorFramework) clientOpt.get(); - TreeCache cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT); - TreeCacheListener listener = new TreeCacheListener() { - @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) { - String path = event.getData().getPath(); - byte[] data = event.getData().getData(); - String sensor = Iterables.getLast(Splitter.on("/").split(path), null); - if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER); - sensorMap.put(sensor, new String(data)); - } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - configMap.put(ConfigurationType.GLOBAL, new String(data)); - } else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) { - configMap.put(ConfigurationType.PROFILER, new String(data)); - } else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT); - sensorMap.put(sensor, new String(data)); - } else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING); - sensorMap.put(sensor, new String(data)); - } - } - else if(event.getType().equals(TreeCacheEvent.Type.NODE_REMOVED)) { - String path = event.getData().getPath(); - String sensor = Iterables.getLast(Splitter.on("/").split(path), null); - if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.PARSER); - sensorMap.remove(sensor); - } - else if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.ENRICHMENT); - sensorMap.remove(sensor); - } - else if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) { - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ConfigurationType.INDEXING); - sensorMap.remove(sensor); - } - else if (ConfigurationType.PROFILER.getZookeeperRoot().equals(path)) { - configMap.put(ConfigurationType.PROFILER, null); - } - else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - configMap.put(ConfigurationType.GLOBAL, null); - } - } - } - }; - cache.getListenable().addListener(listener); - cache.start(); - for(ConfigurationType ct : ConfigurationType.values()) { - switch(ct) { - case GLOBAL: - case PROFILER: - { - String data = ""; - try { - byte[] bytes = ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot(), client); - data = new String(bytes); - } - catch(Exception ex) { - - } - configMap.put(ct, data); - } - break; - case INDEXING: - case ENRICHMENT: - case PARSER: - { - List<String> sensorTypes = client.getChildren().forPath(ct.getZookeeperRoot()); - Map<String, String> sensorMap = (Map<String, String>)configMap.get(ct); - for(String sensorType : sensorTypes) { - sensorMap.put(sensorType, new String(ConfigurationsUtils.readFromZookeeper(ct.getZookeeperRoot() + "/" + sensorType, client))); - } - } - break; - } + + try { + return JSONUtils.INSTANCE.toJSON(object, true); + + } catch (JsonProcessingException e) { + throw new RuntimeException(e); } - context.addCapability("treeCache", () -> cache); } @Stellar( - namespace = "CONFIG" - ,name = "GET" - ,description = "Retrieve a Metron configuration from zookeeper." - ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER" - , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)" - , "emptyIfNotPresent - If true, then return an empty, minimally viable config" - } - ,returns = "The String representation of the config in zookeeper" - ) + namespace = "CONFIG", + name = "GET", + description = "Retrieve a Metron configuration from zookeeper.", + params = { + "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER", + "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)", + "emptyIfNotPresent - If true, then return an empty, minimally viable config" + }, + returns = "The String representation of the config in zookeeper") public static class ConfigGet implements StellarFunction { - boolean initialized = false; + + /** + * Whether the function has been initialized. + */ + private boolean initialized = false; + + /** + * The Zookeeper client. + */ + private CuratorFramework zkClient; + @Override public Object apply(List<Object> args, Context context) throws ParseException { - ConfigurationType type = ConfigurationType.valueOf((String)args.get(0)); - boolean emptyIfNotPresent = true; + String result; - switch(type) { - case GLOBAL: - case PROFILER: - return configMap.get(type); - case PARSER: { - String sensor = (String) args.get(1); - if(args.size() > 2) { - emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class); - } - Map<String, String> sensorMap = (Map<String, String>) configMap.get(type); - String ret = sensorMap.get(sensor); - if (ret == null && emptyIfNotPresent ) { - SensorParserConfig config = new SensorParserConfig(); - config.setSensorTopic(sensor); - try { - ret = JSONUtils.INSTANCE.toJSON(config, true); - } catch (JsonProcessingException e) { - LOG.error("Unable to serialize default object: {}", e.getMessage(), e); - throw new ParseException("Unable to serialize default object: " + e.getMessage(), e); - } - } - return ret; - } - case INDEXING: { - String sensor = (String) args.get(1); - if(args.size() > 2) { - emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class); - } - Map<String, String> sensorMap = (Map<String, String>) configMap.get(type); - String ret = sensorMap.get(sensor); - if (ret == null && emptyIfNotPresent ) { - Map<String, Object> config = new HashMap<>(); - try { - ret = JSONUtils.INSTANCE.toJSON(config, true); - IndexingConfigurations.setIndex(config, sensor); - } catch (JsonProcessingException e) { - LOG.error("Unable to serialize default object: {}", e.getMessage(), e); - throw new ParseException("Unable to serialize default object: " + e.getMessage(), e); - } - } - return ret; - } - case ENRICHMENT: { - String sensor = (String) args.get(1); - if(args.size() > 2) { - emptyIfNotPresent = ConversionUtils.convert(args.get(2), Boolean.class); - } - Map<String, String> sensorMap = (Map<String, String>) configMap.get(type); - String ret = sensorMap.get(sensor); - if (ret == null && emptyIfNotPresent ) { - SensorEnrichmentConfig config = new SensorEnrichmentConfig(); - try { - ret = JSONUtils.INSTANCE.toJSON(config, true); - } catch (JsonProcessingException e) { - LOG.error("Unable to serialize default object: {}", e.getMessage(), e); - throw new ParseException("Unable to serialize default object: " + e.getMessage(), e); - } - } - return ret; + // the configuration type to write + String arg0 = getArg(0, String.class, args); + ConfigurationType type = ConfigurationType.valueOf(arg0); + + try { + + if (GLOBAL == type) { + result = getGlobalConfig(args); + + } else if (PROFILER == type) { + result = getProfilerConfig(args); + + } else if (ENRICHMENT == type) { + result = getEnrichmentConfig(args); + + } else if (INDEXING == type) { + result = getIndexingConfig(args); + + } else if (PARSER == type) { + result = getParserConfig(args); + + } else { + throw new IllegalArgumentException("Unexpected configuration type: " + type); } - default: - throw new UnsupportedOperationException("Unable to support type " + type); + + } catch(Exception e) { + throw new RuntimeException(e); } + + return result; } - @Override - public void initialize(Context context) { - try { - setupTreeCache(context); - } catch (Exception e) { - LOG.error("Unable to initialize: {}", e.getMessage(), e); + /** + * Retrieves the Global configuration. + * + * @return The Global configuration. + * @throws Exception + */ + private String getGlobalConfig(List<Object> args) throws Exception { + + Map<String, Object> globals = readGlobalConfigFromZookeeper(zkClient); + + // provide empty/default config if one is not present? + if(globals == null && emptyIfNotPresent(args)) { + globals = new HashMap<>(); } - finally { - initialized = true; + + return toJSON(globals); + } + + /** + * Retrieves the Parser configuration. + * + * @param args The function arguments. + * @return The Parser configuration. + * @throws Exception + */ + private String getParserConfig(List<Object> args) throws Exception { + + // retrieve the enrichment config for the given sensor + String sensor = getArg(1, String.class, args); + SensorParserConfig sensorConfig = readSensorParserConfigFromZookeeper(sensor, zkClient); + + // provide empty/default config if one is not present? + if(sensorConfig == null && emptyIfNotPresent(args)) { + sensorConfig = new SensorParserConfig(); } + + return toJSON(sensorConfig); + } + + /** + * Retrieve the Enrichment configuration. + * + * @param args The function arguments. + * @return The Enrichment configuration as a JSON string. + * @throws Exception + */ + private String getEnrichmentConfig(List<Object> args) throws Exception { + + // retrieve the enrichment config for the given sensor + String sensor = getArg(1, String.class, args); + SensorEnrichmentConfig sensorConfig = readSensorEnrichmentConfigFromZookeeper(sensor, zkClient); + + // provide empty/default config if one is not present? + if(sensorConfig == null && emptyIfNotPresent(args)) { + sensorConfig = new SensorEnrichmentConfig(); + } + + return toJSON(sensorConfig); + } + + /** + * Retrieve the Indexing configuration. + * + * @param args The function arguments. + * @return The Indexing configuration as a JSON string. + * @throws Exception + */ + private String getIndexingConfig(List<Object> args) throws Exception { + + // retrieve the enrichment config for the given sensor + String sensor = getArg(1, String.class, args); + Map<String, Object> sensorConfig = readSensorIndexingConfigFromZookeeper(sensor, zkClient); + + // provide empty/default config if one is not present? + if(sensorConfig == null && emptyIfNotPresent(args)) { + sensorConfig = Collections.emptyMap(); + } + + return toJSON(sensorConfig); + } + + /** + * Retrieve the Profiler configuration. + * + * @param args The function arguments. + * @return The Profiler configuration as a JSON string. + * @throws Exception + */ + private String getProfilerConfig(List<Object> args) throws Exception { + + ProfilerConfig profilerConfig = readProfilerConfigFromZookeeper(zkClient); + + // provide empty/default config if one is not present? + if(profilerConfig == null && emptyIfNotPresent(args)) { + profilerConfig = new ProfilerConfig(); + } + + return toJSON(profilerConfig); + } + + /** + * Retrieves the 'emptyIfNotPresent' argument. + * + * <p>This determines whether a default configuration should be returned, if no + * configuration is not present. This defaults to true. + * + * @param args The function arguments. + * @return The 'emptyIfNotPresent' argument. + * @throws Exception + */ + private boolean emptyIfNotPresent(List<Object> args) { + + boolean emptyIfNotPresent = true; + int lastIndex = args.size() - 1; + + // expect 'emptyIfNotPresent' to always be the last boolean arg + if(args.size() >= 2 && args.get(lastIndex) instanceof Boolean) { + emptyIfNotPresent = getArg(lastIndex, Boolean.class, args); + } + + return emptyIfNotPresent; + } + + @Override + public void initialize(Context context) { + zkClient = getZookeeperClient(context); } @Override @@ -250,91 +318,69 @@ public class ConfigurationFunctions { return initialized; } } + @Stellar( - namespace = "CONFIG" - ,name = "PUT" - ,description = "Updates a Metron config to Zookeeper." - ,params = {"type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER" - ,"config - The config (a string in JSON form) to update" - , "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)" - } - ,returns = "The String representation of the config in zookeeper" - ) + namespace = "CONFIG", + name = "PUT", + description = "Updates a Metron config to Zookeeper.", + params = { + "type - One of ENRICHMENT, INDEXING, PARSER, GLOBAL, PROFILER", + "config - The config (a string in JSON form) to update", + "sensor - Sensor to retrieve (required for enrichment and parser, not used for profiler and global)" + }, + returns = "The String representation of the config in zookeeper") public static class ConfigPut implements StellarFunction { - private CuratorFramework client; - private boolean initialized = false; @Override public Object apply(List<Object> args, Context context) throws ParseException { - ConfigurationType type = ConfigurationType.valueOf((String)args.get(0)); - String config = (String)args.get(1); - if(config == null) { - return null; - } - try { - switch (type) { - case GLOBAL: - ConfigurationsUtils.writeGlobalConfigToZookeeper(config.getBytes(), client); - break; - case PROFILER: - ConfigurationsUtils.writeProfilerConfigToZookeeper(config.getBytes(), client); - break; - case ENRICHMENT: - { - String sensor = (String) args.get(2); - if(sensor == null) { - return null; - } - ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper(sensor, config.getBytes(), client); - } - break; - case INDEXING: - { - String sensor = (String) args.get(2); - if(sensor == null) { - return null; - } - ConfigurationsUtils.writeSensorIndexingConfigToZookeeper(sensor, config.getBytes(), client); - } - break; - case PARSER: - { - String sensor = (String) args.get(2); - if(sensor == null) { - return null; - } - ConfigurationsUtils.writeSensorParserConfigToZookeeper(sensor, config.getBytes(), client); + + // the configuration type to write + String arg0 = getArg(0, String.class, args); + ConfigurationType type = ConfigurationType.valueOf(arg0); + + // the configuration value to write + String value = getArg(1, String.class, args); + if(value != null) { + + CuratorFramework client = getZookeeperClient(context); + try { + + if(GLOBAL == type) { + writeGlobalConfigToZookeeper(value.getBytes(), client); + + } else if(PROFILER == type) { + writeProfilerConfigToZookeeper(value.getBytes(), client); + + } else if(ENRICHMENT == type) { + String sensor = getArg(2, String.class, args); + writeSensorEnrichmentConfigToZookeeper(sensor, value.getBytes(), client); + + } else if(INDEXING == type) { + String sensor = getArg(2, String.class, args); + writeSensorIndexingConfigToZookeeper(sensor, value.getBytes(), client); + + } else if (PARSER == type) { + String sensor = getArg(2, String.class, args); + writeSensorParserConfigToZookeeper(sensor, value.getBytes(), client); } - break; + + } catch(Exception e) { + LOG.error("Unexpected exception: {}", e.getMessage(), e); + throw new ParseException(e.getMessage()); } } - catch(Exception ex) { - LOG.error("Unable to put config: {}", ex.getMessage(), ex); - throw new ParseException("Unable to put config: " + ex.getMessage(), ex); - } + return null; } @Override public void initialize(Context context) { - Optional<Object> clientOpt = context.getCapability(Context.Capabilities.ZOOKEEPER_CLIENT); - if(!clientOpt.isPresent()) { - throw new IllegalStateException("I expected a zookeeper client to exist and it did not. Please connect to zookeeper."); - } - client = (CuratorFramework) clientOpt.get(); - try { - setupTreeCache(context); - } catch (Exception e) { - LOG.error("Unable to initialize: {}", e.getMessage(), e); - } - finally { - initialized = true; - } + // nothing to do } @Override public boolean isInitialized() { - return initialized; + return true; } } } http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java index 1920031..67e2a9d 100644 --- a/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java +++ b/metron-platform/metron-management/src/test/java/org/apache/metron/management/ConfigurationFunctionsTest.java @@ -19,194 +19,393 @@ package org.apache.metron.management; import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; +import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.PosixParser; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.test.TestingServer; import org.apache.log4j.Level; import org.apache.metron.common.cli.ConfigurationManager; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.ParseException; import org.apache.metron.test.utils.UnitTestHelper; -import org.json.simple.parser.JSONParser; import org.json.simple.JSONObject; -import org.junit.Assert; +import org.json.simple.parser.JSONParser; +import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; -import java.util.HashMap; +import java.util.Collections; +import java.util.Map; import static org.apache.metron.TestConstants.PARSER_CONFIGS_PATH; import static org.apache.metron.TestConstants.SAMPLE_CONFIG_PATH; +import static org.apache.metron.common.configuration.ConfigurationType.GLOBAL; +import static org.apache.metron.common.configuration.ConfigurationType.PROFILER; +import static org.apache.metron.common.configuration.ConfigurationsUtils.writeProfilerConfigToZookeeper; import static org.apache.metron.management.utils.FileUtils.slurp; import static org.apache.metron.stellar.common.utils.StellarProcessorUtils.run; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +/** + * Tests the ConfigurationFunctions class. + */ public class ConfigurationFunctionsTest { + private static TestingServer testZkServer; - private static CuratorFramework client; private static String zookeeperUrl; - private Context context = new Context.Builder() - .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) - .build(); + private static CuratorFramework client; + private static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json"); + private static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json"); + private static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json"); + private static String goodTestIndexingConfig = slurp( SAMPLE_CONFIG_PATH + "/indexing/test.json"); + + private Context context; + private JSONParser parser; + + /** + * { + * "profiles" : [ + * { + * "profile" : "counter", + * "foreach" : "ip_src_addr", + * "init" : { "counter" : 0 }, + * "update" : { "counter" : "counter + 1" }, + * "result" : "counter" + * } + * ], + * "timestampField" : "timestamp" + * } + */ + @Multiline + private static String goodProfilerConfig; + @BeforeClass - public static void setup() throws Exception { + public static void setupZookeeper() throws Exception { + + // zookeeper server testZkServer = new TestingServer(true); zookeeperUrl = testZkServer.getConnectString(); + + // zookeeper client client = ConfigurationsUtils.getClient(zookeeperUrl); client.start(); + } - pushConfigs(SAMPLE_CONFIG_PATH); - pushConfigs(PARSER_CONFIGS_PATH); + @Before + public void setup() throws Exception { + context = new Context.Builder() + .with(Context.Capabilities.ZOOKEEPER_CLIENT, () -> client) + .build(); + + parser = new JSONParser(); + // push configs to zookeeper + pushConfigs(SAMPLE_CONFIG_PATH, zookeeperUrl); + pushConfigs(PARSER_CONFIGS_PATH, zookeeperUrl); + writeProfilerConfigToZookeeper(goodProfilerConfig.getBytes(), client); } - private static void pushConfigs(String inputPath) throws Exception { - String[] args = new String[]{ - "-z", zookeeperUrl - , "--mode", "PUSH" - , "--input_dir", inputPath - }; - ConfigurationManager manager = new ConfigurationManager(); - manager.run(ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args)); + /** + * Deletes a path within Zookeeper. + * + * @param path The path within Zookeeper to delete. + * @throws Exception + */ + private void deletePath(String path) throws Exception { + client.delete().forPath(path); } + /** + * Transforms a String to a {@link JSONObject}. + * + * @param input The input String to transform + * @return A {@link JSONObject}. + * @throws org.json.simple.parser.ParseException + */ + private JSONObject toJSONObject(String input) throws org.json.simple.parser.ParseException { - static String goodBroParserConfig = slurp(PARSER_CONFIGS_PATH + "/parsers/bro.json"); + if(input == null) { + return null; + } + return (JSONObject) parser.parse(input.trim()); + } /** - { - "sensorTopic" : "brop", - "parserConfig" : { }, - "fieldTransformations" : [ ], - "readMetadata":false, - "mergeMetadata":false, - "parserParallelism" : 1, - "errorWriterParallelism" : 1, - "spoutNumTasks" : 1, - "stormConfig" : {}, - "errorWriterNumTasks":1, - "spoutConfig":{}, - "parserNumTasks":1, - "spoutParallelism":1 - } + * Push configuration values to Zookeeper. + * + * @param inputPath The local filesystem path to the configurations. + * @param zookeeperUrl The URL of Zookeeper. + * @throws Exception */ - @Multiline - static String defaultBropParserConfig; + private static void pushConfigs(String inputPath, String zookeeperUrl) throws Exception { + + String[] args = new String[] { + "-z", zookeeperUrl, + "--mode", "PUSH", + "--input_dir", inputPath + }; + CommandLine cli = ConfigurationManager.ConfigurationOptions.parse(new PosixParser(), args); + ConfigurationManager manager = new ConfigurationManager(); + manager.run(cli); + } + /** + * The CONFIG_GET function should be able to return the Parser configuration + * for a given sensor. + */ @Test - public void testParserGetHappyPath() { + public void testGetParser() throws Exception { + + String out = (String) run("CONFIG_GET('PARSER', 'bro')", context); - Object out = run("CONFIG_GET('PARSER', 'bro')", new HashMap<>(), context); - Assert.assertEquals(goodBroParserConfig, out); + SensorParserConfig actual = SensorParserConfig.fromBytes(out.getBytes()); + SensorParserConfig expected = SensorParserConfig.fromBytes(goodBroParserConfig.getBytes()); + assertEquals(expected, actual); } + /** + * The CONFIG_GET function should NOT return any configuration when the + * Parser configuration for a given sensor is missing AND emptyIfNotPresent = false. + */ @Test - public void testParserGetMissWithoutDefault() { + public void testGetParserMissWithoutDefault() { - { - Object out = run("CONFIG_GET('PARSER', 'brop', false)", new HashMap<>(), context); - Assert.assertNull(out); - } + // expect null because emptyIfNotPresent = false + Object out = run("CONFIG_GET('PARSER', 'sensor', false)", context); + assertNull(out); } + /** + * The CONFIG_GET function should return a default configuration when none + * currently exists. + */ @Test - public void testParserGetMissWithDefault() throws Exception { - JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropParserConfig); + public void testGetParserMissWithDefault() throws Exception { + SensorParserConfig expected = new SensorParserConfig(); { - Object out = run("CONFIG_GET('PARSER', 'brop')", new HashMap<>(), context); - JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim()); - Assert.assertEquals(expected, actual); + Object out = run("CONFIG_GET('PARSER', 'sensor')", context); + SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes()); + assertEquals(expected, actual); } { - Object out = run("CONFIG_GET('PARSER', 'brop', true)", new HashMap<>(), context); - JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim()); - Assert.assertEquals(expected, actual); + Object out = run("CONFIG_GET('PARSER', 'sensor', true)", context); + SensorParserConfig actual = SensorParserConfig.fromBytes(out.toString().getBytes()); + assertEquals(expected, actual); } } - static String goodTestEnrichmentConfig = slurp( SAMPLE_CONFIG_PATH + "/enrichments/test.json"); + /** + * The CONFIG_GET function should be able to return the Enrichment configuration + * for a given sensor. + */ + @Test + public void testGetEnrichment() throws Exception { + + String out = (String) run("CONFIG_GET('ENRICHMENT', 'test')", context); + + SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes()); + SensorEnrichmentConfig expected = SensorEnrichmentConfig.fromBytes(goodTestEnrichmentConfig.getBytes()); + assertEquals(expected, actual); + } + + /** + * No default configuration should be provided in this case. + */ + @Test + public void testGetEnrichmentMissWithoutDefault() { + + // expect null because emptyIfNotPresent = false + Object out = run("CONFIG_GET('ENRICHMENT', 'sense', false)", context); + assertNull(out); + } /** + * A default empty configuration should be provided, if one does not exist. + */ + @Test + public void testGetEnrichmentMissWithDefault() throws Exception { + + // expect an empty configuration to be returned + SensorEnrichmentConfig expected = new SensorEnrichmentConfig(); { - "enrichment" : { - "fieldMap" : { }, - "fieldToTypeMap" : { }, - "config" : { } - }, - "threatIntel" : { - "fieldMap" : { }, - "fieldToTypeMap" : { }, - "config" : { }, - "triageConfig" : { - "riskLevelRules" : [ ], - "aggregator" : "MAX", - "aggregationConfig" : { } - } - }, - "configuration" : { } + String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor')", context); + SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes()); + assertEquals(expected, actual); + } + { + String out = (String) run("CONFIG_GET('ENRICHMENT', 'missing-sensor', true)", context); + SensorEnrichmentConfig actual = SensorEnrichmentConfig.fromBytes(out.getBytes()); + assertEquals(expected, actual); } + } + + /** + * The CONFIG_GET function should be able to return the Indexing configuration + * for a given sensor. */ - @Multiline - static String defaultBropEnrichmentConfig; + @Test + public void testGetIndexing() throws Exception { + String out = (String) run("CONFIG_GET('INDEXING', 'test')", context); + + Map<String, Object> actual = toJSONObject(out); + Map<String, Object> expected = toJSONObject(goodTestIndexingConfig); + assertEquals(expected, actual); + } + /** + * No default configuration should be provided in this case. + */ @Test - public void testEnrichmentGetHappyPath() { + public void testGetIndexingMissWithoutDefault() { - Object out = run("CONFIG_GET('ENRICHMENT', 'test')", new HashMap<>(), context); - Assert.assertEquals(goodTestEnrichmentConfig, out.toString().trim()); + // expect null because emptyIfNotPresent = false + Object out = run("CONFIG_GET('INDEXING', 'sense', false)", context); + assertNull(out); } + /** + * A default empty configuration should be provided, if one does not exist. + */ @Test - public void testEnrichmentGetMissWithoutDefault() { + public void testGetIndexingtMissWithDefault() throws Exception { + // expect an empty configuration to be returned + Map<String, Object> expected = Collections.emptyMap(); + { + String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor')", context); + Map<String, Object> actual = toJSONObject(out); + assertEquals(expected, actual); + } { - Object out = run("CONFIG_GET('ENRICHMENT', 'brop', false)", new HashMap<>(), context); - Assert.assertNull(out); + String out = (String) run("CONFIG_GET('INDEXING', 'missing-sensor', true)", context); + Map<String, Object> actual = toJSONObject(out); + assertEquals(expected, actual); } } + /** + * The CONFIG_GET function should be able to return the Profiler configuration. + */ + @Test + public void testGetProfiler() throws Exception { + + String out = (String) run("CONFIG_GET('PROFILER')", context); + + ProfilerConfig actual = ProfilerConfig.fromBytes(out.getBytes()); + ProfilerConfig expected = ProfilerConfig.fromBytes(goodProfilerConfig.getBytes()); + assertEquals(expected, actual); + } + + /** + * No default configuration should be provided in this case. + */ @Test - public void testEnrichmentGetMissWithDefault() throws Exception { - JSONObject expected = (JSONObject) new JSONParser().parse(defaultBropEnrichmentConfig); + public void testGetProfilerMissWithoutDefault() throws Exception { + + deletePath(PROFILER.getZookeeperRoot()); + // expect null because emptyIfNotPresent = false + String out = (String) run("CONFIG_GET('PROFILER', false)", context); + assertNull(out); + } + + /** + * A default empty configuration should be provided, if one does not exist. + */ + @Test + public void testGetProfilerMissWithDefault() throws Exception { + + // there is no profiler config in zookeeper + deletePath(PROFILER.getZookeeperRoot()); + + // expect an empty configuration to be returned + ProfilerConfig expected = new ProfilerConfig(); { - Object out = run("CONFIG_GET('ENRICHMENT', 'brop')", new HashMap<>(), context); - JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim()); - Assert.assertEquals(expected, actual); + String out = (String) run("CONFIG_GET('PROFILER', true)", context); + ProfilerConfig actual = ProfilerConfig.fromJSON(out); + assertEquals(expected, actual); } { - Object out = run("CONFIG_GET('ENRICHMENT', 'brop', true)", new HashMap<>(), context); - JSONObject actual = (JSONObject) new JSONParser().parse(out.toString().trim()); - Assert.assertEquals(expected, actual); + String out = (String) run("CONFIG_GET('PROFILER')", context); + ProfilerConfig actual = ProfilerConfig.fromJSON(out); + assertEquals(expected, actual); } } - static String goodGlobalConfig = slurp( SAMPLE_CONFIG_PATH+ "/global.json"); + @Test + public void testGetGlobal() throws Exception { + + String out = (String) run("CONFIG_GET('GLOBAL')", context); + + Map<String, Object> actual = toJSONObject(out); + Map<String, Object> expected = toJSONObject(goodGlobalConfig); + assertEquals(expected, actual); + } + + /** + * No default configuration should be provided in this case. + */ + @Test + public void testGetGlobalMissWithoutDefault() throws Exception { + + // there is no global config in zookeeper + deletePath(GLOBAL.getZookeeperRoot()); + + // expect null because emptyIfNotPresent = false + Object out = run("CONFIG_GET('GLOBAL', false)", context); + assertNull(out); + } + /** + * A default empty configuration should be provided, if one does not exist. + */ @Test - public void testGlobalGet() { + public void testGetGlobalMissWithDefault() throws Exception { + + // there is no global config in zookeeper + deletePath(GLOBAL.getZookeeperRoot()); - Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context); - Assert.assertEquals(goodGlobalConfig, out.toString().trim()); + // expect an empty configuration to be returned + Map<String, Object> expected = Collections.emptyMap(); + { + String out = (String) run("CONFIG_GET('GLOBAL')", context); + Map<String, Object> actual = toJSONObject(out); + assertEquals(expected, actual); + } + { + String out = (String) run("CONFIG_GET('GLOBAL', true)", context); + Map<String, Object> actual = toJSONObject(out); + assertEquals(expected, actual); + } } @Test - public void testGlobalPut() { + public void testPutGlobal() throws Exception { + + String out = (String) run("CONFIG_GET('GLOBAL')", context); - Object out = run("CONFIG_GET('GLOBAL')", new HashMap<>(), context); - Assert.assertEquals(goodGlobalConfig, out.toString().trim()); + Map<String, Object> actual = toJSONObject(out); + Map<String, Object> expected = toJSONObject(goodGlobalConfig); + assertEquals(expected, actual); } @Test(expected=ParseException.class) - public void testGlobalPutBad() { + public void testPutGlobalBad() { { UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL); try { - run("CONFIG_PUT('GLOBAL', 'foo bar')", new HashMap<>(), context); + run("CONFIG_PUT('GLOBAL', 'foo bar')", context); } catch(ParseException e) { UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.ERROR); throw e; @@ -215,23 +414,23 @@ public class ConfigurationFunctionsTest { } @Test - public void testIndexingPut() throws InterruptedException { - String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", new HashMap<>(), context); + public void testPutIndexing() throws InterruptedException { + String brop= (String) run("CONFIG_GET('INDEXING', 'testIndexingPut')", context); run("CONFIG_PUT('INDEXING', config, 'testIndexingPut')", ImmutableMap.of("config", brop), context); boolean foundMatch = false; for(int i = 0;i < 10 && !foundMatch;++i) { - String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", new HashMap<>(), context); + String bropNew = (String) run("CONFIG_GET('INDEXING', 'testIndexingPut', false)", context); foundMatch = brop.equals(bropNew); if(foundMatch) { break; } Thread.sleep(2000); } - Assert.assertTrue(foundMatch); + assertTrue(foundMatch); } @Test(expected= ParseException.class) - public void testIndexingPutBad() throws InterruptedException { + public void testPutIndexingBad() throws InterruptedException { { { UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL); @@ -246,23 +445,26 @@ public class ConfigurationFunctionsTest { } @Test - public void testEnrichmentPut() throws InterruptedException { - String brop= (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut')", new HashMap<>(), context); - run("CONFIG_PUT('ENRICHMENT', config, 'testEnrichmentPut')", ImmutableMap.of("config", brop), context); + public void testPutEnrichment() throws InterruptedException { + String config = (String) run("CONFIG_GET('ENRICHMENT', 'sensor')", context); + assertNotNull(config); + + run("CONFIG_PUT('ENRICHMENT', config, 'sensor')", ImmutableMap.of("config", config), context); + boolean foundMatch = false; for(int i = 0;i < 10 && !foundMatch;++i) { - String bropNew = (String) run("CONFIG_GET('ENRICHMENT', 'testEnrichmentPut', false)", new HashMap<>(), context); - foundMatch = brop.equals(bropNew); + String newConfig = (String) run("CONFIG_GET('ENRICHMENT', 'sensor', false)", context); + foundMatch = config.equals(newConfig); if(foundMatch) { break; } Thread.sleep(2000); } - Assert.assertTrue(foundMatch); + assertTrue(foundMatch); } @Test(expected= ParseException.class) - public void testEnrichmentPutBad() throws InterruptedException { + public void testPutEnrichmentBad() throws InterruptedException { { { UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL); @@ -277,23 +479,23 @@ public class ConfigurationFunctionsTest { } @Test - public void testParserPut() throws InterruptedException { - String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", new HashMap<>(), context); + public void testPutParser() throws InterruptedException { + String brop= (String) run("CONFIG_GET('PARSER', 'testParserPut')", context); run("CONFIG_PUT('PARSER', config, 'testParserPut')", ImmutableMap.of("config", brop), context); boolean foundMatch = false; for(int i = 0;i < 10 && !foundMatch;++i) { - String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", new HashMap<>(), context); + String bropNew = (String) run("CONFIG_GET('PARSER', 'testParserPut', false)", context); foundMatch = brop.equals(bropNew); if(foundMatch) { break; } Thread.sleep(2000); } - Assert.assertTrue(foundMatch); + assertTrue(foundMatch); } @Test(expected= ParseException.class) - public void testParserPutBad() throws InterruptedException { + public void testPutParserBad() throws InterruptedException { { UnitTestHelper.setLog4jLevel(ConfigurationFunctions.class, Level.FATAL); try { http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java index 781a0cf..352ae2b 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/shell/DefaultStellarShellExecutor.java @@ -52,7 +52,6 @@ import java.io.ByteArrayInputStream; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -342,15 +341,18 @@ public class DefaultStellarShellExecutor implements StellarShellExecutor { * @param zkClient An optional Zookeeper client. */ private Context createContext(Properties properties, Optional<CuratorFramework> zkClient) throws Exception { + Context.Builder contextBuilder = new Context.Builder(); Map<String, Object> globals; if (zkClient.isPresent()) { + LOG.debug("Zookeeper client present; fetching globals from Zookeeper."); // fetch globals from zookeeper globals = fetchGlobalConfig(zkClient.get()); contextBuilder.with(ZOOKEEPER_CLIENT, () -> zkClient.get()); } else { + LOG.debug("No Zookeeper client; initializing empty globals."); // use empty globals to allow a user to '%define' their own globals = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/metron/blob/37e3fd32/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java index 5912657..d5f267e 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/StellarProcessorUtils.java @@ -18,17 +18,18 @@ package org.apache.metron.stellar.common.utils; +import com.google.common.collect.ImmutableList; +import org.apache.metron.stellar.common.StellarPredicateProcessor; +import org.apache.metron.stellar.common.StellarProcessor; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.DefaultVariableResolver; import org.apache.metron.stellar.dsl.MapVariableResolver; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.metron.stellar.dsl.VariableResolver; -import com.google.common.collect.ImmutableList; -import org.apache.metron.stellar.common.StellarPredicateProcessor; -import org.apache.metron.stellar.common.StellarProcessor; import org.junit.Assert; import java.util.AbstractMap; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Spliterators; @@ -39,39 +40,76 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import java.util.stream.StreamSupport; +/** + * Utilities for executing and validating Stellar expressions. + */ public class StellarProcessorUtils { - /** - * This utility class is intended for use while unit testing Stellar operators. - * It is included in the "main" code so third-party operators will not need - * a test dependency on Stellar's test-jar. - * - * This class ensures the basic contract of a stellar expression is adhered to: - * 1. Validate works on the expression - * 2. The output can be serialized and deserialized properly - * - * @param rule - * @param variables - * @param context - * @return ret - */ - public static Object run(String rule, Map<String, Object> variables, Context context) { - StellarProcessor processor = new StellarProcessor(); - Assert.assertTrue(rule + " not valid.", processor.validate(rule, context)); - Object ret = processor.parse(rule, new DefaultVariableResolver(x -> variables.get(x),x-> variables.containsKey(x)), StellarFunctions.FUNCTION_RESOLVER(), context); - byte[] raw = SerDeUtils.toBytes(ret); - Object actual = SerDeUtils.fromBytes(raw, Object.class); - Assert.assertEquals(ret, actual); - return ret; - } + /** + * Execute and validate a Stellar expression. + * + * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression + * validates successfully and produces a result that can be serialized correctly. + * + * @param expression The expression to execute. + * @param variables The variables to expose to the expression. + * @param context The execution context. + * @return The result of executing the expression. + */ + public static Object run(String expression, Map<String, Object> variables, Context context) { + + // validate the expression + StellarProcessor processor = new StellarProcessor(); + Assert.assertTrue("Invalid expression; expr=" + expression, + processor.validate(expression, context)); + + // execute the expression + Object ret = processor.parse( + expression, + new DefaultVariableResolver(x -> variables.get(x), x -> variables.containsKey(x)), + StellarFunctions.FUNCTION_RESOLVER(), + context); + + // ensure the result can be serialized/deserialized + byte[] raw = SerDeUtils.toBytes(ret); + Object actual = SerDeUtils.fromBytes(raw, Object.class); + Assert.assertEquals(ret, actual); + + return ret; + } + + /** + * Execute and validate a Stellar expression. + * + * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression + * validates successfully and produces a result that can be serialized correctly. + * + * @param expression The expression to execute. + * @param variables The variables to expose to the expression. + * @return The result of executing the expression. + */ + public static Object run(String expression, Map<String, Object> variables) { + return run(expression, variables, Context.EMPTY_CONTEXT()); + } - public static Object run(String rule, Map<String, Object> variables) { - return run(rule, variables, Context.EMPTY_CONTEXT()); + /** + * Execute and validate a Stellar expression. + * + * <p>This is intended for use while unit testing Stellar expressions. This ensures that the expression + * validates successfully and produces a result that can be serialized correctly. + * + * @param expression The expression to execute. + * @param context The execution context. + * @return The result of executing the expression. + */ + public static Object run(String expression, Context context) { + return run(expression, Collections.emptyMap(), context); } - public static void validate(String rule, Context context) { + public static void validate(String expression, Context context) { StellarProcessor processor = new StellarProcessor(); - Assert.assertTrue(rule + " not valid.", processor.validate(rule, context)); + Assert.assertTrue("Invalid expression; expr=" + expression, + processor.validate(expression, context)); } public static void validate(String rule) { @@ -101,19 +139,18 @@ public class StellarProcessorUtils { } public static void runWithArguments(String function, List<Object> arguments, Object expected) { - Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport.stream(new XRange(arguments.size()), false) - .map( i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i))); + Supplier<Stream<Map.Entry<String, Object>>> kvStream = () -> StreamSupport + .stream(new XRange(arguments.size()), false) + .map(i -> new AbstractMap.SimpleImmutableEntry<>("var" + i, arguments.get(i))); - String args = kvStream.get().map( kv -> kv.getKey()) - .collect(Collectors.joining(",")); + String args = kvStream.get().map(kv -> kv.getKey()).collect(Collectors.joining(",")); Map<String, Object> variables = kvStream.get().collect(Collectors.toMap(kv -> kv.getKey(), kv -> kv.getValue())); - String stellarStatement = function + "(" + args + ")"; + String stellarStatement = function + "(" + args + ")"; String reason = stellarStatement + " != " + expected + " with variables: " + variables; - if(expected instanceof Double) { - Assert.assertEquals(reason, (Double)expected, (Double)run(stellarStatement, variables), 1e-6); - } - else { + if (expected instanceof Double) { + Assert.assertEquals(reason, (Double) expected, (Double) run(stellarStatement, variables), 1e-6); + } else { Assert.assertEquals(reason, expected, run(stellarStatement, variables)); } } @@ -135,10 +172,9 @@ public class StellarProcessorUtils { @Override public boolean tryAdvance(IntConsumer action) { boolean isDone = i >= end; - if(isDone) { + if (isDone) { return false; - } - else { + } else { action.accept(i); i++; return true; @@ -148,25 +184,20 @@ public class StellarProcessorUtils { /** * {@inheritDoc} * - * @param action - * to {@code IntConsumer} and passed to - * {@link #tryAdvance(IntConsumer)}; otherwise - * the action is adapted to an instance of {@code IntConsumer}, by - * boxing the argument of {@code IntConsumer}, and then passed to - * {@link #tryAdvance(IntConsumer)}. + * @param action to {@code IntConsumer} and passed to {@link #tryAdvance(IntConsumer)}; + * otherwise the action is adapted to an instance of {@code IntConsumer}, by boxing the + * argument of {@code IntConsumer}, and then passed to {@link #tryAdvance(IntConsumer)}. */ @Override public boolean tryAdvance(Consumer<? super Integer> action) { boolean isDone = i >= end; - if(isDone) { + if (isDone) { return false; - } - else { + } else { action.accept(i); i++; return true; } } } - }
