http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java index 6c518b1..ecf8a1b 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/Configurations.java @@ -36,11 +36,19 @@ import org.slf4j.LoggerFactory; public class Configurations implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private List<FieldValidator> validations = new ArrayList<>(); - protected ConcurrentMap<String, Object> configurations = new ConcurrentHashMap<>(); + protected Map<String, Object> configurations = new ConcurrentHashMap<>(); + + public Map<String, Object> getConfigurations() { + return configurations; + } @SuppressWarnings("unchecked") public Map<String, Object> getGlobalConfig() { - return (Map<String, Object>) configurations.getOrDefault(ConfigurationType.GLOBAL.getTypeName(), new HashMap()); + return getGlobalConfig(true); + } + + public Map<String, Object> getGlobalConfig(boolean emptyMapOnNonExistent) { + return (Map<String, Object>) getConfigurations().getOrDefault(ConfigurationType.GLOBAL.getTypeName(), emptyMapOnNonExistent?new HashMap():null); } public List<FieldValidator> getFieldValidations() { @@ -59,10 +67,15 @@ public class Configurations implements Serializable { } public void updateGlobalConfig(Map<String, Object> globalConfig) { - configurations.put(ConfigurationType.GLOBAL.getTypeName(), globalConfig); - validations = FieldValidator.readValidations(getGlobalConfig()); + if(globalConfig != null) { + getConfigurations().put(ConfigurationType.GLOBAL.getTypeName(), globalConfig); + validations = FieldValidator.readValidations(getGlobalConfig()); + } } + public void deleteGlobalConfig() { + getConfigurations().remove(ConfigurationType.GLOBAL.getTypeName()); + } @Override public boolean equals(Object o) { @@ -72,14 +85,14 @@ public class Configurations implements Serializable { Configurations that = (Configurations) o; if (validations != null ? !validations.equals(that.validations) : that.validations != null) return false; - return configurations != null ? configurations.equals(that.configurations) : that.configurations == null; + return getConfigurations() != null ? getConfigurations().equals(that.getConfigurations()) : that.getConfigurations() == null; } @Override public int hashCode() { int result = validations != null ? validations.hashCode() : 0; - result = 31 * result + (configurations != null ? configurations.hashCode() : 0); + result = 31 * result + (getConfigurations() != null ? getConfigurations().hashCode() : 0); return result; } @@ -87,7 +100,7 @@ public class Configurations implements Serializable { public String toString() { return "Configurations{" + "validations=" + validations + - ", configurations=" + configurations + + ", configurations=" + getConfigurations()+ '}'; } }
http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java index fcc8050..ae21152 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ConfigurationsUtils.java @@ -28,11 +28,16 @@ import java.io.ByteArrayInputStream; import java.io.File; import java.io.IOException; import java.io.PrintStream; +import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + import org.apache.commons.io.FilenameUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -44,8 +49,11 @@ import org.apache.metron.common.utils.JSONUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.StellarFunctions; import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ConfigurationsUtils { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); public static CuratorFramework getClient(String zookeeperUrl) { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); @@ -175,28 +183,56 @@ public class ConfigurationsUtils { configurations.updateGlobalConfig(readGlobalConfigBytesFromZookeeper(client)); } - public static void updateParserConfigsFromZookeeper(ParserConfigurations configurations, CuratorFramework client) throws Exception { - updateConfigsFromZookeeper(configurations, client); - List<String> sensorTypes = client.getChildren().forPath(PARSER.getZookeeperRoot()); + + private interface Callback { + void apply(String sensorType) throws Exception; + } + + private static void updateConfigsFromZookeeper( Configurations configurations + , ConfigurationType type + , Callback callback + , CuratorFramework client + ) throws Exception + { + Exception globalUpdateException = null; + try { + updateConfigsFromZookeeper(configurations, client); + } + catch(Exception e) { + LOG.warn("Unable to update global config when updating indexing configs: " + e.getMessage(), e); + globalUpdateException = e; + } + List<String> sensorTypes = client.getChildren().forPath(type.getZookeeperRoot()); for(String sensorType: sensorTypes) { - configurations.updateSensorParserConfig(sensorType, readSensorParserConfigBytesFromZookeeper(sensorType, client)); + callback.apply(sensorType); } + if(globalUpdateException != null) { + throw globalUpdateException; + } + } + + public static void updateParserConfigsFromZookeeper(ParserConfigurations configurations, CuratorFramework client) throws Exception { + updateConfigsFromZookeeper( configurations + , PARSER + , sensorType -> configurations.updateSensorParserConfig(sensorType, readSensorParserConfigBytesFromZookeeper(sensorType, client)) + , client + ); } public static void updateSensorIndexingConfigsFromZookeeper(IndexingConfigurations configurations, CuratorFramework client) throws Exception { - updateConfigsFromZookeeper(configurations, client); - List<String> sensorTypes = client.getChildren().forPath(INDEXING.getZookeeperRoot()); - for(String sensorType: sensorTypes) { - configurations.updateSensorIndexingConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client)); - } + updateConfigsFromZookeeper( configurations + , INDEXING + , sensorType -> configurations.updateSensorIndexingConfig(sensorType, readSensorIndexingConfigBytesFromZookeeper(sensorType, client)) + , client + ); } public static void updateEnrichmentConfigsFromZookeeper(EnrichmentConfigurations configurations, CuratorFramework client) throws Exception { - updateConfigsFromZookeeper(configurations, client); - List<String> sensorTypes = client.getChildren().forPath(ENRICHMENT.getZookeeperRoot()); - for(String sensorType: sensorTypes) { - configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client)); - } + updateConfigsFromZookeeper( configurations + , ENRICHMENT + , sensorType -> configurations.updateSensorEnrichmentConfig(sensorType, readSensorEnrichmentConfigBytesFromZookeeper(sensorType, client)) + , client + ); } public static SensorEnrichmentConfig readSensorEnrichmentConfigFromZookeeper(String sensorType, CuratorFramework client) throws Exception { http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java index a28b15c..dfd7a65 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/EnrichmentConfigurations.java @@ -23,27 +23,43 @@ import org.apache.metron.common.utils.JSONUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; public class EnrichmentConfigurations extends Configurations { - public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) { - return (SensorEnrichmentConfig) configurations.get(getKey(sensorType)); - } + public SensorEnrichmentConfig getSensorEnrichmentConfig(String sensorType) { + return (SensorEnrichmentConfig) getConfigurations().get(getKey(sensorType)); + } - public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException { - updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data)); - } + public void updateSensorEnrichmentConfig(String sensorType, byte[] data) throws IOException { + updateSensorEnrichmentConfig(sensorType, new ByteArrayInputStream(data)); + } - public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException { - SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class); - updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig); - } + public void updateSensorEnrichmentConfig(String sensorType, InputStream io) throws IOException { + SensorEnrichmentConfig sensorEnrichmentConfig = JSONUtils.INSTANCE.load(io, SensorEnrichmentConfig.class); + updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfig); + } - public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) { - configurations.put(getKey(sensorType), sensorEnrichmentConfig); - } + public void updateSensorEnrichmentConfig(String sensorType, SensorEnrichmentConfig sensorEnrichmentConfig) { + getConfigurations().put(getKey(sensorType), sensorEnrichmentConfig); + } + + public void delete(String sensorType) { + getConfigurations().remove(getKey(sensorType)); + } - private String getKey(String sensorType) { - return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType; + public List<String> getTypes() { + List<String> ret = new ArrayList<>(); + for(String keyedSensor : getConfigurations().keySet()) { + if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.ENRICHMENT.getTypeName())) { + ret.add(keyedSensor.substring(ConfigurationType.ENRICHMENT.getTypeName().length() + 1)); + } } + return ret; + } + + public static String getKey(String sensorType) { + return ConfigurationType.ENRICHMENT.getTypeName() + "." + sensorType; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java index 3bd7645..003b6df 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/IndexingConfigurations.java @@ -33,8 +33,36 @@ public class IndexingConfigurations extends Configurations { public static final String INDEX_CONF = "index"; public static final String OUTPUT_PATH_FUNCTION_CONF = "outputPathFunction"; + public Map<String, Object> getSensorIndexingConfig(String sensorType, boolean emptyMapOnNonExistent) { + Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType)); + if(ret == null) { + return emptyMapOnNonExistent?new HashMap<>():null; + } + else { + return ret; + } + } + + public Map<String, Object> getSensorIndexingConfig(String sensorType) { + return getSensorIndexingConfig(sensorType, true); + } + + public List<String> getTypes() { + List<String> ret = new ArrayList<>(); + for(String keyedSensor : getConfigurations().keySet()) { + if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.INDEXING.getTypeName())) { + ret.add(keyedSensor.substring(ConfigurationType.INDEXING.getTypeName().length() + 1)); + } + } + return ret; + } + + public void delete(String sensorType) { + getConfigurations().remove(getKey(sensorType)); + } + public Map<String, Object> getSensorIndexingConfig(String sensorType, String writerName) { - Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorType)); + Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorType)); if(ret == null) { return new HashMap(); } @@ -55,15 +83,15 @@ public class IndexingConfigurations extends Configurations { } public void updateSensorIndexingConfig(String sensorType, Map<String, Object> sensorIndexingConfig) { - configurations.put(getKey(sensorType), sensorIndexingConfig); + getConfigurations().put(getKey(sensorType), sensorIndexingConfig); } - private String getKey(String sensorType) { + public static String getKey(String sensorType) { return ConfigurationType.INDEXING.getTypeName() + "." + sensorType; } public boolean isDefault(String sensorName, String writerName) { - Map<String, Object> ret = (Map<String, Object>) configurations.get(getKey(sensorName)); + Map<String, Object> ret = (Map<String, Object>) getConfigurations().get(getKey(sensorName)); if(ret == null) { return true; } @@ -100,7 +128,7 @@ public class IndexingConfigurations extends Configurations { String keyPrefixString = getKey(""); int prefixStringLength = keyPrefixString.length(); List<Integer> configuredBatchTimeouts = new ArrayList<>(); - for (String sensorKeyString : configurations.keySet()) { + for (String sensorKeyString : getConfigurations().keySet()) { if (sensorKeyString.startsWith(keyPrefixString)) { String configuredSensorName = sensorKeyString.substring(prefixStringLength); configuredBatchTimeouts.add(getBatchTimeout(configuredSensorName, writerName)); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java index 0ec0ed4..72af833 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/ParserConfigurations.java @@ -22,11 +22,13 @@ import org.apache.metron.common.utils.JSONUtils; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; public class ParserConfigurations extends Configurations { public SensorParserConfig getSensorParserConfig(String sensorType) { - return (SensorParserConfig) configurations.get(getKey(sensorType)); + return (SensorParserConfig) getConfigurations().get(getKey(sensorType)); } public void updateSensorParserConfig(String sensorType, byte[] data) throws IOException { @@ -40,10 +42,24 @@ public class ParserConfigurations extends Configurations { public void updateSensorParserConfig(String sensorType, SensorParserConfig sensorParserConfig) { sensorParserConfig.init(); - configurations.put(getKey(sensorType), sensorParserConfig); + getConfigurations().put(getKey(sensorType), sensorParserConfig); } - private String getKey(String sensorType) { + public List<String> getTypes() { + List<String> ret = new ArrayList<>(); + for(String keyedSensor : getConfigurations().keySet()) { + if(!keyedSensor.isEmpty() && keyedSensor.startsWith(ConfigurationType.PARSER.getTypeName())) { + ret.add(keyedSensor.substring(ConfigurationType.PARSER.getTypeName().length() + 1)); + } + } + return ret; + } + + public void delete(String sensorType) { + getConfigurations().remove(getKey(sensorType)); + } + + public static String getKey(String sensorType) { return ConfigurationType.PARSER.getTypeName() + "." + sensorType; } } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java index 9a42426..55642a9 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResult.java @@ -80,6 +80,14 @@ public class ProfileResult { } @Override + public String toString() { + return "ProfileResult{" + + "profileExpressions=" + profileExpressions + + ", triageExpressions=" + triageExpressions + + '}'; + } + + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java index 1bca716..82af223 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileResultExpressions.java @@ -43,6 +43,13 @@ public class ProfileResultExpressions { } @Override + public String toString() { + return "ProfileResultExpressions{" + + "expression='" + expression + '\'' + + '}'; + } + + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java index da74f64..fbe1706 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfileTriageExpressions.java @@ -64,4 +64,27 @@ public class ProfileTriageExpressions { public Map<String, String> getExpressions() { return expressions; } + + @Override + public String toString() { + return "ProfileTriageExpressions{" + + "expressions=" + expressions + + '}'; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ProfileTriageExpressions that = (ProfileTriageExpressions) o; + + return getExpressions() != null ? getExpressions().equals(that.getExpressions()) : that.getExpressions() == null; + + } + + @Override + public int hashCode() { + return getExpressions() != null ? getExpressions().hashCode() : 0; + } } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java index cd651bd..e7c081a 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfig.java @@ -40,6 +40,13 @@ public class ProfilerConfig implements Serializable { } @Override + public String toString() { + return "ProfilerConfig{" + + "profiles=" + profiles + + '}'; + } + + @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java index e001d74..f50d770 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/configuration/profiler/ProfilerConfigurations.java @@ -31,7 +31,7 @@ import java.io.InputStream; public class ProfilerConfigurations extends Configurations { public ProfilerConfig getProfilerConfig() { - return (ProfilerConfig) configurations.get(getKey()); + return (ProfilerConfig) getConfigurations().get(getKey()); } public void updateProfilerConfig(byte[] data) throws IOException { @@ -44,10 +44,15 @@ public class ProfilerConfigurations extends Configurations { } public void updateProfilerConfig(ProfilerConfig config) { - configurations.put(getKey(), config); + getConfigurations().put(getKey(), config); } - private String getKey() { + public static String getKey() { return ConfigurationType.PROFILER.getTypeName(); } + + public void delete() { + configurations.remove(getKey()); + } + } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java new file mode 100644 index 0000000..462d754 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ConfigurationsCache.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper; + +import org.apache.metron.common.configuration.Configurations; + +/** + * A cache for multiple Configurations object. This cache is generally kept in + * sync with zookeeper changes. + */ +public interface ConfigurationsCache extends AutoCloseable{ + /** + * Return the Configurations object given the specific type of Configurations object. + * @param configClass The Configurations class to return + * @param <T> The type of Configurations class. + * @return The Configurations object + */ + <T extends Configurations> T get(Class<T> configClass); + + /** + * Reset the cache and reload from zookeeper. + */ + void reset(); + + /** + * Start the cache. + */ + void start(); +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java new file mode 100644 index 0000000..42967b2 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/ZKConfigurationsCache.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper; + +import com.google.common.collect.Iterables; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.metron.common.Constants; +import org.apache.metron.common.configuration.*; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.zookeeper.SimpleEventListener; +import org.apache.metron.zookeeper.ZKCache; +import org.apache.metron.common.zookeeper.configurations.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; + +public class ZKConfigurationsCache implements ConfigurationsCache { + private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + + + private interface UpdaterSupplier { + ConfigurationsUpdater<? extends Configurations> create(Map<Class<? extends Configurations>, Configurations> configs,Reloadable reloadCallback); + } + + public enum ConfiguredTypes implements UpdaterSupplier { + ENRICHMENT((c,r) -> new EnrichmentUpdater( r, createSupplier(EnrichmentConfigurations.class, c))) + ,PARSER((c,r) -> new ParserUpdater( r, createSupplier(ParserConfigurations.class, c))) + ,INDEXING((c,r) -> new IndexingUpdater( r, createSupplier(IndexingConfigurations.class, c))) + ,PROFILER((c,r) -> new ProfilerUpdater( r, createSupplier(ProfilerConfigurations.class, c))) + ; + UpdaterSupplier updaterSupplier; + ConfiguredTypes(UpdaterSupplier supplier) { + this.updaterSupplier = supplier; + } + + @Override + public ConfigurationsUpdater<? extends Configurations> + create(Map<Class<? extends Configurations>, Configurations> configs, Reloadable reloadCallback) { + return updaterSupplier.create(configs, reloadCallback); + } + } + + private List<ConfigurationsUpdater< ? extends Configurations>> updaters; + private final Map<Class<? extends Configurations>, Configurations> configs; + private ZKCache cache; + private CuratorFramework client; + ReadWriteLock lock = new ReentrantReadWriteLock(); + + public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable, ConfiguredTypes... types) { + updaters = new ArrayList<>(); + configs = new HashMap<>(); + this.client = client; + for(ConfiguredTypes t : types) { + ConfigurationsUpdater<? extends Configurations> updater = t.create(configs, reloadable); + configs.put(updater.getConfigurationClass(), updater.defaultConfigurations()); + updaters.add(updater); + } + } + + public ZKConfigurationsCache(CuratorFramework client, Reloadable reloadable) { + this(client, reloadable, ConfiguredTypes.values()); + } + + public ZKConfigurationsCache(CuratorFramework client, ConfiguredTypes... types) { + this(client, (name, type) -> {}, types); + } + + public ZKConfigurationsCache(CuratorFramework client) { + this(client, ConfiguredTypes.values()); + } + + private static <T extends Configurations> Supplier<T> createSupplier(Class<T> clazz, Map<Class<? extends Configurations>, Configurations> configs) { + return () -> clazz.cast(configs.get(clazz)); + } + + @Override + public void start() { + initializeCache(client); + } + + @Override + public void close() { + Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); + if (cache != null) { + cache.close(); + } + } + finally{ + writeLock.unlock(); + } + } + + public void reset() { + Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); + close(); + initializeCache(client); + } + finally{ + writeLock.unlock(); + } + } + + private void initializeCache(CuratorFramework client) { + Lock writeLock = lock.writeLock(); + try { + writeLock.lock(); + SimpleEventListener listener = new SimpleEventListener.Builder() + .with(Iterables.transform(updaters, u -> u::update) + , TreeCacheEvent.Type.NODE_ADDED + , TreeCacheEvent.Type.NODE_UPDATED + ) + .with(Iterables.transform(updaters, u -> u::delete) + , TreeCacheEvent.Type.NODE_REMOVED + ) + .build(); + cache = new ZKCache.Builder() + .withClient(client) + .withListener(listener) + .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT) + .build(); + + for (ConfigurationsUpdater<? extends Configurations> updater : updaters) { + updater.forceUpdate(client); + } + cache.start(); + } catch (Exception e) { + LOG.error("Unable to initialize zookeeper cache: " + e.getMessage(), e); + throw new IllegalStateException("Unable to initialize zookeeper cache: " + e.getMessage(), e); + } + finally { + writeLock.unlock(); + } + } + + + public <T extends Configurations> T get(Class<T> configClass){ + Lock readLock = lock.readLock(); + try { + readLock.lock(); + return configClass.cast(configs.get(configClass)); + } + finally{ + readLock.unlock(); + } + } + + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java new file mode 100644 index 0000000..9ff2bee --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ConfigurationsUpdater.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.Configurations; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.Serializable; +import java.lang.invoke.MethodHandles; +import java.util.function.Supplier; + +/** + * Handles update for an underlying Configurations object. This is the base abstract implementation. + * You will find system-specific implementations (e.g. IndexingUpdater, ParserUpdater, etc.) which + * correspond to the various components of our system which accept configuration from zookeeper. + * + * @param <T> the Type of Configuration + */ +public abstract class ConfigurationsUpdater<T extends Configurations> implements Serializable { + protected static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private Reloadable reloadable; + private Supplier<T> configSupplier; + + /** + * Construct a ConfigurationsUpdater + * @param reloadable A callback which gets called whenever a reload happens + * @param configSupplier A Supplier which creates the Configurations object. + */ + public ConfigurationsUpdater(Reloadable reloadable + , Supplier<T> configSupplier + ) + { + this.reloadable = reloadable; + this.configSupplier = configSupplier; + } + + /** + * Update callback, this is called whenever a path is updated in zookeeper which we are monitoring. + * + * @param client The CuratorFramework + * @param path The zookeeper path + * @param data The change. + * @throws IOException When update is impossible. + */ + public void update(CuratorFramework client, String path, byte[] data) throws IOException { + if (data.length != 0) { + String name = path.substring(path.lastIndexOf("/") + 1); + if (path.startsWith(getType().getZookeeperRoot())) { + LOG.debug("Updating the {} config: {} -> {}", getType().name(), name, new String(data == null?"".getBytes():data)); + update(name, data); + reloadCallback(name, getType()); + } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { + LOG.debug("Updating the global config: {}", new String(data == null?"".getBytes():data)); + getConfigurations().updateGlobalConfig(data); + reloadCallback(name, ConfigurationType.GLOBAL); + } + } + } + + /** + * Delete callback, this is called whenever a path is deleted in zookeeper which we are monitoring. + * + * @param client The CuratorFramework + * @param path The zookeeper path + * @param data The change. + * @throws IOException When update is impossible. + */ + public void delete(CuratorFramework client, String path, byte[] data) throws IOException { + String name = path.substring(path.lastIndexOf("/") + 1); + if (path.startsWith(getType().getZookeeperRoot())) { + LOG.debug("Deleting {} {} config from internal cache", getType().name(), name); + delete(name); + reloadCallback(name, getType()); + } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { + LOG.debug("Deleting global config from internal cache"); + getConfigurations().deleteGlobalConfig(); + reloadCallback(name, ConfigurationType.GLOBAL); + } + } + + /** + * The ConfigurationsType that we're monitoring. + * @return The ConfigurationsType enum + */ + public abstract ConfigurationType getType(); + + /** + * The simple update. This differs from the full update elsewhere in that + * this is ONLY called on updates to path to the zookeeper nodes which correspond + * to your configurations type (rather than all configurations type). + * @param name The path + * @param data The data updated + * @throws IOException when update is unable to happen + */ + public abstract void update(String name, byte[] data) throws IOException; + + /** + * The simple delete. This differs from the full delete elsewhere in that + * this is ONLY called on deletes to path to the zookeeper nodes which correspond + * to your configurations type (rather than all configurations type). + * @param name the path + * @throws IOException when update is unable to happen + */ + public abstract void delete(String name); + + /** + * + * @return The Class for the Configurations type. + */ + public abstract Class<T> getConfigurationClass(); + + /** + * This pulls the configuration from zookeeper and updates the cache. It represents the initial state. + * Force update is called when the zookeeper cache is initialized to ensure that the caches are updated. + * @param client + */ + public abstract void forceUpdate(CuratorFramework client); + + /** + * Create an empty Configurations object of type T. + * @return + */ + public abstract T defaultConfigurations(); + + protected void reloadCallback(String name, ConfigurationType type) { + reloadable.reloadCallback(name, type); + } + + protected T getConfigurations() { + return configSupplier.get(); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java new file mode 100644 index 0000000..29de525 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/EnrichmentUpdater.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.invoke.MethodHandles; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class EnrichmentUpdater extends ConfigurationsUpdater<EnrichmentConfigurations>{ + + public EnrichmentUpdater(Reloadable reloadable, Supplier<EnrichmentConfigurations> configSupplier) { + super(reloadable, configSupplier); + } + + @Override + public Class<EnrichmentConfigurations> getConfigurationClass() { + return EnrichmentConfigurations.class; + } + + @Override + public void forceUpdate(CuratorFramework client) { + try { + ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client); + } + catch (KeeperException.NoNodeException nne) { + LOG.warn("No current enrichment configs in zookeeper, but the cache should load lazily..."); + } + catch (Exception e) { + LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...", e); + } + } + + @Override + public EnrichmentConfigurations defaultConfigurations() { + return new EnrichmentConfigurations(); + } + + @Override + public ConfigurationType getType() { + return ConfigurationType.ENRICHMENT; + } + + @Override + public void delete(String name) { + getConfigurations().delete(name); + } + + @Override + public void update(String name, byte[] data) throws IOException { + getConfigurations().updateSensorEnrichmentConfig(name, data); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java new file mode 100644 index 0000000..8017930 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/IndexingUpdater.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class IndexingUpdater extends ConfigurationsUpdater<IndexingConfigurations> { + public IndexingUpdater(Reloadable reloadable, Supplier<IndexingConfigurations> configSupplier) { + super(reloadable, configSupplier); + } + + @Override + public Class<IndexingConfigurations> getConfigurationClass() { + return IndexingConfigurations.class; + } + + @Override + public void forceUpdate(CuratorFramework client) { + try { + ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client); + } + catch (KeeperException.NoNodeException nne) { + LOG.warn("No current indexing configs in zookeeper, but the cache should load lazily..."); + } + catch (Exception e) { + LOG.warn("Unable to load indexing configs from zookeeper, but the cache should load lazily...", e); + } + } + + @Override + public IndexingConfigurations defaultConfigurations() { + return new IndexingConfigurations(); + } + + @Override + public ConfigurationType getType() { + return ConfigurationType.INDEXING; + } + + @Override + public void delete(String name) { + getConfigurations().delete(name); + } + + @Override + public void update(String name, byte[] data) throws IOException { + getConfigurations().updateSensorIndexingConfig(name, data); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java new file mode 100644 index 0000000..c91844e --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ParserUpdater.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.ParserConfigurations; +import org.apache.zookeeper.KeeperException; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class ParserUpdater extends ConfigurationsUpdater<ParserConfigurations> { + public ParserUpdater(Reloadable reloadable, Supplier<ParserConfigurations> configSupplier) { + super(reloadable, configSupplier); + } + + @Override + public Class<ParserConfigurations> getConfigurationClass() { + return ParserConfigurations.class; + } + + @Override + public void forceUpdate(CuratorFramework client) { + try { + ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client); + } + catch (KeeperException.NoNodeException nne) { + LOG.warn("No current parser configs in zookeeper, but the cache should load lazily..."); + } + catch (Exception e) { + LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily...", e); + } + } + + @Override + public ParserConfigurations defaultConfigurations() { + return new ParserConfigurations(); + } + + @Override + public ConfigurationType getType() { + return ConfigurationType.PARSER; + } + + @Override + public void delete(String name) { + getConfigurations().delete(name); + } + + @Override + public void update(String name, byte[] data) throws IOException { + getConfigurations().updateSensorParserConfig(name, data); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java new file mode 100644 index 0000000..68c5203 --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/ProfilerUpdater.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import static org.apache.metron.common.configuration.ConfigurationType.PROFILER; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.zookeeper.KeeperException; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.function.Supplier; + +public class ProfilerUpdater extends ConfigurationsUpdater<ProfilerConfigurations> { + public ProfilerUpdater(Reloadable reloadable, Supplier<ProfilerConfigurations> configSupplier) { + super(reloadable, configSupplier); + } + + @Override + public Class<ProfilerConfigurations> getConfigurationClass() { + return ProfilerConfigurations.class; + } + + private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception { + byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot()); + return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class); + } + + @Override + public void forceUpdate(CuratorFramework client) { + try { + ConfigurationsUtils.updateConfigsFromZookeeper(getConfigurations(), client); + } + catch (KeeperException.NoNodeException nne) { + LOG.warn("No current global configs in zookeeper, but the cache should load lazily..."); + } + catch(Exception e) { + LOG.warn("Unable to load global configs from zookeeper, but the cache should load lazily...", e); + } + try { + ProfilerConfig config = readFromZookeeper(client); + if(config != null) { + getConfigurations().updateProfilerConfig(config); + } + + } + catch (KeeperException.NoNodeException nne) { + LOG.warn("No current profiler configs in zookeeper, but the cache should load lazily..."); + } + catch (Exception e) { + LOG.warn("Unable to load profiler configs from zookeeper, but the cache should load lazily...", e); + } + } + + @Override + public ProfilerConfigurations defaultConfigurations() { + return new ProfilerConfigurations(); + } + + @Override + public ConfigurationType getType() { + return ConfigurationType.PROFILER; + } + + @Override + public void delete(String name) { + getConfigurations().delete(); + } + + @Override + public void update(String name, byte[] data) throws IOException { + getConfigurations().updateProfilerConfig(data); + } + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java new file mode 100644 index 0000000..308c74e --- /dev/null +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/zookeeper/configurations/Reloadable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper.configurations; + +import org.apache.metron.common.configuration.ConfigurationType; + +import java.io.Serializable; + +public interface Reloadable extends Serializable { + void reloadCallback(String name, ConfigurationType type); + +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/scripts/stellar ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/scripts/stellar b/metron-platform/metron-common/src/main/scripts/stellar index 56c2d4d..3f53c49 100644 --- a/metron-platform/metron-common/src/main/scripts/stellar +++ b/metron-platform/metron-common/src/main/scripts/stellar @@ -33,4 +33,4 @@ export METRON_VERSION=${project.version} export METRON_HOME=/usr/metron/$METRON_VERSION export STELLAR_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar) export MANAGEMENT_LIB=$(find $METRON_HOME/lib/ -name metron-management*.jar) -java $JVMFLAGS -cp "$HBASE_CONFIGS:${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB" org.apache.metron.stellar.common.shell.StellarShell "$@" +java $JVMFLAGS -cp "${CONTRIB:-$METRON_HOME/contrib/*}:$STELLAR_LIB:$MANAGEMENT_LIB:$HBASE_CONFIGS" org.apache.metron.stellar.common.shell.StellarShell "$@" http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java new file mode 100644 index 0000000..64bf986 --- /dev/null +++ b/metron-platform/metron-common/src/test/java/org/apache/metron/common/zookeeper/ZKConfigurationsCacheIntegrationTest.java @@ -0,0 +1,296 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.metron.common.zookeeper; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.adrianwalker.multilinestring.Multiline; +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.metron.TestConstants; +import org.apache.metron.common.configuration.*; +import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfig; +import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.integration.components.ZKServerComponent; +import org.junit.*; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Map; + +import static org.apache.metron.integration.utils.TestUtils.assertEventually; + +public class ZKConfigurationsCacheIntegrationTest { + private CuratorFramework client; + + /** + { + "profiles": [ + { + "profile": "example2", + "foreach": "ip_src_addr", + "onlyif": "protocol == 'HTTP'", + "init": { + "total_bytes": 0.0 + }, + "update": { + "total_bytes": "total_bytes + bytes_in" + }, + "result": "total_bytes", + "expires": 30 + } + ] +} + */ + @Multiline + public static String profilerConfig; + /** + { + "hdfs" : { + "index": "yaf", + "batchSize": 1, + "enabled" : true + }, + "elasticsearch" : { + "index": "yaf", + "batchSize": 25, + "batchTimeout": 7, + "enabled" : false + }, + "solr" : { + "index": "yaf", + "batchSize": 5, + "enabled" : false + } + } + */ + @Multiline + public static String testIndexingConfig; + + /** + { + "enrichment": { + "fieldMap": { } + ,"fieldToTypeMap": { } + }, + "threatIntel": { + "fieldMap": { }, + "fieldToTypeMap": { }, + "triageConfig" : { } + } + } + */ + @Multiline + public static String testEnrichmentConfig; + + /** + { + "parserClassName":"org.apache.metron.parsers.bro.BasicBroParser", + "sensorTopic":"brop", + "parserConfig": {} + } + */ + @Multiline + public static String testParserConfig; + + /** + *{ + "es.clustername": "metron", + "es.ip": "localhost", + "es.port": 9300, + "es.date.format": "yyyy.MM.dd.HH" + } + */ + @Multiline + public static String globalConfig; + + public static File profilerDir = new File("../../metron-analytics/metron-profiler/src/test/config/zookeeper"); + public ConfigurationsCache cache; + + public ZKServerComponent zkComponent; + + @Before + public void setup() throws Exception { + zkComponent = new ZKServerComponent(); + zkComponent.start(); + client = ConfigurationsUtils.getClient(zkComponent.getConnectionString()); + client.start(); + cache = new ZKConfigurationsCache(client); + cache.start(); + { + //parser + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json"))); + ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", config, client); + } + { + //indexing + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json"))); + ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", config, client); + } + { + //enrichments + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"))); + ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client); + } + { + //enrichments + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"))); + ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", config, client); + } + { + //profiler + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(profilerDir, "/readme-example-1/profiler.json"))); + ConfigurationsUtils.writeProfilerConfigToZookeeper( config, client); + } + { + //global config + byte[] config = IOUtils.toByteArray(new FileInputStream(new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json"))); + ConfigurationsUtils.writeGlobalConfigToZookeeper(config, client); + } + } + + @After + public void teardown() throws Exception { + if(cache != null) { + cache.close(); + } + if(client != null) { + client.close(); + } + if(zkComponent != null) { + zkComponent.stop(); + } + } + + + @Test + public void validateDelete() throws Exception { + client.delete().forPath(ConfigurationType.GLOBAL.getZookeeperRoot()); + client.delete().forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/test"); + client.delete().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/test"); + client.delete().forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro"); + client.delete().forPath(ConfigurationType.PROFILER.getZookeeperRoot() ); + //global + { + IndexingConfigurations config = cache.get( IndexingConfigurations.class); + assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); + } + //indexing + { + IndexingConfigurations config = cache.get( IndexingConfigurations.class); + assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("test", false))); + assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); + } + //enrichment + { + EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); + assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("test"))); + assertEventually(()-> Assert.assertNull(config.getGlobalConfig(false))); + } + //parser + { + ParserConfigurations config = cache.get( ParserConfigurations.class); + assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("bro"))); + assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); + } + //profiler + { + ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); + assertEventually(() -> Assert.assertNull(config.getProfilerConfig())); + assertEventually(() -> Assert.assertNull(config.getGlobalConfig(false))); + } + } + + @Test + public void validateUpdate() throws Exception { + ConfigurationsUtils.writeSensorIndexingConfigToZookeeper("test", testIndexingConfig.getBytes(), client); + ConfigurationsUtils.writeGlobalConfigToZookeeper(globalConfig.getBytes(), client); + ConfigurationsUtils.writeSensorEnrichmentConfigToZookeeper("test", testEnrichmentConfig.getBytes(), client); + ConfigurationsUtils.writeSensorParserConfigToZookeeper("bro", testParserConfig.getBytes(), client); + ConfigurationsUtils.writeProfilerConfigToZookeeper( profilerConfig.getBytes(), client); + //indexing + { + Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(testIndexingConfig, new TypeReference<Map<String, Object>>() {}); + IndexingConfigurations config = cache.get( IndexingConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test"))); + } + //enrichment + { + SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(testEnrichmentConfig, SensorEnrichmentConfig.class); + Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfig, new TypeReference<Map<String, Object>>() {}); + EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test"))); + assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); + } + //parsers + { + SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(testParserConfig, SensorParserConfig.class); + ParserConfigurations config = cache.get( ParserConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro"))); + } + //profiler + { + ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(profilerConfig, ProfilerConfig.class); + ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); + } + } + + @Test + public void validateBaseWrite() throws Exception { + File globalConfigFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/global.json"); + Map<String, Object> expectedGlobalConfig = JSONUtils.INSTANCE.load(globalConfigFile, new TypeReference<Map<String, Object>>() { }); + //indexing + { + File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/indexing/test.json"); + Map<String, Object> expectedConfig = JSONUtils.INSTANCE.load(inFile, new TypeReference<Map<String, Object>>() { + }); + IndexingConfigurations config = cache.get( IndexingConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorIndexingConfig("test"))); + assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); + assertEventually(() -> Assert.assertNull(config.getSensorIndexingConfig("notthere", false))); + } + //enrichment + { + File inFile = new File(TestConstants.SAMPLE_CONFIG_PATH + "/enrichments/test.json"); + SensorEnrichmentConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorEnrichmentConfig.class); + EnrichmentConfigurations config = cache.get( EnrichmentConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorEnrichmentConfig("test"))); + assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); + assertEventually(() -> Assert.assertNull(config.getSensorEnrichmentConfig("notthere"))); + } + //parsers + { + File inFile = new File(TestConstants.PARSER_CONFIGS_PATH + "/parsers/bro.json"); + SensorParserConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, SensorParserConfig.class); + ParserConfigurations config = cache.get( ParserConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getSensorParserConfig("bro"))); + assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); + assertEventually(() -> Assert.assertNull(config.getSensorParserConfig("notthere"))); + } + //profiler + { + File inFile = new File(profilerDir, "/readme-example-1/profiler.json"); + ProfilerConfig expectedConfig = JSONUtils.INSTANCE.load(inFile, ProfilerConfig.class); + ProfilerConfigurations config = cache.get( ProfilerConfigurations.class); + assertEventually(() -> Assert.assertEquals(expectedConfig, config.getProfilerConfig())); + assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, config.getGlobalConfig())); + } + } +} http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java index 5f6f22f..308638e 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/BulkMessageWriterBoltTest.java @@ -123,7 +123,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); - bulkMessageWriterBolt.setTreeCache(cache); + bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); bulkMessageWriterBolt.declareOutputFields(declarer); @@ -182,7 +182,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message").withBatchTimeoutDivisor(3); bulkMessageWriterBolt.setCuratorFramework(client); - bulkMessageWriterBolt.setTreeCache(cache); + bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType, new FileInputStream(sampleSensorIndexingConfigPath)); bulkMessageWriterBolt.declareOutputFields(declarer); @@ -226,7 +226,7 @@ public class BulkMessageWriterBoltTest extends BaseEnrichmentBoltTest { .withBulkMessageWriter(bulkMessageWriter).withMessageGetter(MessageGetters.JSON_FROM_FIELD.name()) .withMessageGetterField("message"); bulkMessageWriterBolt.setCuratorFramework(client); - bulkMessageWriterBolt.setTreeCache(cache); + bulkMessageWriterBolt.setZKCache(cache); bulkMessageWriterBolt.getConfigurations().updateSensorIndexingConfig(sensorType , new FileInputStream(sampleSensorIndexingConfigPath)); bulkMessageWriterBolt.declareOutputFields(declarer); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java index 77dd4cf..52135e3 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentJoinBoltTest.java @@ -73,7 +73,7 @@ public class EnrichmentJoinBoltTest extends BaseEnrichmentBoltTest { public void test() throws IOException { EnrichmentJoinBolt enrichmentJoinBolt = new EnrichmentJoinBolt("zookeeperUrl"); enrichmentJoinBolt.setCuratorFramework(client); - enrichmentJoinBolt.setTreeCache(cache); + enrichmentJoinBolt.setZKCache(cache); enrichmentJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath)); enrichmentJoinBolt.withMaxCacheSize(100); enrichmentJoinBolt.withMaxTimeRetain(10000); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java index c79eb10..cbe7ed6 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBoltTest.java @@ -58,7 +58,7 @@ public class EnrichmentSplitterBoltTest extends BaseEnrichmentBoltTest { EnrichmentSplitterBolt enrichmentSplitterBolt = new EnrichmentSplitterBolt("zookeeperUrl").withEnrichments(enrichments); enrichmentSplitterBolt.setCuratorFramework(client); - enrichmentSplitterBolt.setTreeCache(cache); + enrichmentSplitterBolt.setZKCache(cache); enrichmentSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath)); enrichmentSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java index 90322fe..d7b54dd 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/GenericEnrichmentBoltTest.java @@ -161,7 +161,7 @@ public class GenericEnrichmentBoltTest extends BaseEnrichmentBoltTest { } }; genericEnrichmentBolt.setCuratorFramework(client); - genericEnrichmentBolt.setTreeCache(cache); + genericEnrichmentBolt.setZKCache(cache); genericEnrichmentBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath)); HashMap<String, Object> globalConfig = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java index e03dc71..1bb1083 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/JoinBoltTest.java @@ -99,7 +99,7 @@ public class JoinBoltTest extends BaseEnrichmentBoltTest { } joinBolt = new StandAloneJoinBolt("zookeeperUrl"); joinBolt.setCuratorFramework(client); - joinBolt.setTreeCache(cache); + joinBolt.setZKCache(cache); } @Test http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java index 41d34de..57dae13 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/SplitBoltTest.java @@ -88,7 +88,7 @@ public class SplitBoltTest extends BaseEnrichmentBoltTest { public void test() { StandAloneSplitBolt splitBolt = spy(new StandAloneSplitBolt("zookeeperUrl")); splitBolt.setCuratorFramework(client); - splitBolt.setTreeCache(cache); + splitBolt.setZKCache(cache); doCallRealMethod().when(splitBolt).reloadCallback(anyString(), any(ConfigurationType.class)); splitBolt.prepare(new HashMap(), topologyContext, outputCollector); splitBolt.declareOutputFields(declarer); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java index 04617ff..62b2570 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelJoinBoltTest.java @@ -157,7 +157,7 @@ public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest { ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl"); threatIntelJoinBolt.setCuratorFramework(client); - threatIntelJoinBolt.setTreeCache(cache); + threatIntelJoinBolt.setZKCache(cache); SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load( new FileInputStream(sampleSensorEnrichmentConfigPath), SensorEnrichmentConfig.class); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java index 4feba2e..f7869e5 100644 --- a/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java +++ b/metron-platform/metron-enrichment/src/test/java/org/apache/metron/enrichment/bolt/ThreatIntelSplitterBoltTest.java @@ -33,7 +33,7 @@ public class ThreatIntelSplitterBoltTest extends BaseEnrichmentBoltTest { String threatIntelType = "hbaseThreatIntel"; ThreatIntelSplitterBolt threatIntelSplitterBolt = new ThreatIntelSplitterBolt("zookeeperUrl"); threatIntelSplitterBolt.setCuratorFramework(client); - threatIntelSplitterBolt.setTreeCache(cache); + threatIntelSplitterBolt.setZKCache(cache); threatIntelSplitterBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, new FileInputStream(sampleSensorEnrichmentConfigPath)); threatIntelSplitterBolt.prepare(new HashMap<>(), topologyContext, outputCollector); Map<String, Object> fieldMap = threatIntelSplitterBolt.getFieldMap(sensorType); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java index e67d3c9..9577a43 100644 --- a/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java +++ b/metron-platform/metron-integration-test/src/main/java/org/apache/metron/integration/utils/TestUtils.java @@ -31,6 +31,28 @@ import java.util.ArrayList; import java.util.List; public class TestUtils { + public static long MAX_ASSERT_WAIT_MS = 30000L; + public interface Assertion { + void apply() throws Exception; + } + public static void assertEventually(Assertion assertion) throws Exception { + assertEventually(assertion, MAX_ASSERT_WAIT_MS); + } + private static void assertEventually(Assertion assertion + , long msToWait + ) throws Exception { + long delta = msToWait/10; + for(int i = 0;i < 10;++i) { + try{ + assertion.apply(); + return; + } + catch(AssertionError t) { + } + Thread.sleep(delta); + } + assertion.apply(); + } public static List<byte[]> readSampleData(String samplePath) throws IOException { BufferedReader br = new BufferedReader(new FileReader(samplePath));
