METRON-1241: Enable the REST API to use a cache for the zookeeper config similar to the Bolts closes apache/incubator-metron#795
Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/cc111ec9 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/cc111ec9 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/cc111ec9 Branch: refs/heads/master Commit: cc111ec984a78db43c4df222851f59280ff5eff9 Parents: aee0184 Author: cstella <[email protected]> Authored: Fri Oct 20 17:20:06 2017 -0400 Committer: cstella <[email protected]> Committed: Fri Oct 20 17:20:06 2017 -0400 ---------------------------------------------------------------------- .../profiler/bolt/ProfileBuilderBoltTest.java | 2 +- .../profiler/bolt/ProfileSplitterBoltTest.java | 2 +- .../metron/rest/config/ZookeeperConfig.java | 11 + .../service/impl/GlobalConfigServiceImpl.java | 33 ++- .../impl/SensorEnrichmentConfigServiceImpl.java | 43 ++- .../impl/SensorIndexingConfigServiceImpl.java | 40 +-- .../impl/SensorParserConfigServiceImpl.java | 38 ++- .../apache/metron/rest/config/TestConfig.java | 11 + .../GlobalConfigControllerIntegrationTest.java | 6 +- ...richmentConfigControllerIntegrationTest.java | 6 +- ...IndexingConfigControllerIntegrationTest.java | 6 +- ...orParserConfigControllerIntegrationTest.java | 19 +- .../StormControllerIntegrationTest.java | 12 + .../impl/GlobalConfigServiceImplTest.java | 30 +- .../SensorEnrichmentConfigServiceImplTest.java | 99 +++---- .../SensorIndexingConfigServiceImplTest.java | 100 +++---- .../impl/SensorParserConfigServiceImplTest.java | 105 +++---- metron-platform/metron-common/pom.xml | 10 +- .../metron/common/bolt/ConfiguredBolt.java | 54 ++-- .../common/bolt/ConfiguredEnrichmentBolt.java | 30 +- .../common/bolt/ConfiguredIndexingBolt.java | 28 +- .../common/bolt/ConfiguredParserBolt.java | 30 +- .../common/bolt/ConfiguredProfilerBolt.java | 47 +-- .../common/configuration/Configurations.java | 27 +- .../configuration/ConfigurationsUtils.java | 64 +++- .../configuration/EnrichmentConfigurations.java | 46 ++- .../configuration/IndexingConfigurations.java | 38 ++- .../configuration/ParserConfigurations.java | 22 +- .../configuration/profiler/ProfileResult.java | 8 + .../profiler/ProfileResultExpressions.java | 7 + .../profiler/ProfileTriageExpressions.java | 23 ++ .../configuration/profiler/ProfilerConfig.java | 7 + .../profiler/ProfilerConfigurations.java | 11 +- .../common/zookeeper/ConfigurationsCache.java | 44 +++ .../common/zookeeper/ZKConfigurationsCache.java | 179 +++++++++++ .../configurations/ConfigurationsUpdater.java | 152 ++++++++++ .../configurations/EnrichmentUpdater.java | 78 +++++ .../configurations/IndexingUpdater.java | 74 +++++ .../zookeeper/configurations/ParserUpdater.java | 74 +++++ .../configurations/ProfilerUpdater.java | 96 ++++++ .../zookeeper/configurations/Reloadable.java | 27 ++ .../metron-common/src/main/scripts/stellar | 2 +- .../ZKConfigurationsCacheIntegrationTest.java | 296 +++++++++++++++++++ .../bolt/BulkMessageWriterBoltTest.java | 6 +- .../enrichment/bolt/EnrichmentJoinBoltTest.java | 2 +- .../bolt/EnrichmentSplitterBoltTest.java | 2 +- .../bolt/GenericEnrichmentBoltTest.java | 2 +- .../metron/enrichment/bolt/JoinBoltTest.java | 2 +- .../metron/enrichment/bolt/SplitBoltTest.java | 2 +- .../bolt/ThreatIntelJoinBoltTest.java | 2 +- .../bolt/ThreatIntelSplitterBoltTest.java | 2 +- .../metron/integration/utils/TestUtils.java | 22 ++ .../metron/parsers/bolt/ParserBoltTest.java | 176 +++++------ metron-platform/metron-test-utilities/pom.xml | 11 +- .../apache/metron/test/bolt/BaseBoltTest.java | 3 +- metron-platform/metron-zookeeper/pom.xml | 48 +++ .../metron/zookeeper/SimpleEventListener.java | 123 ++++++++ .../org/apache/metron/zookeeper/ZKCache.java | 196 ++++++++++++ metron-platform/pom.xml | 1 + .../stellar-common/src/main/scripts/stellar | 2 +- 60 files changed, 2027 insertions(+), 612 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java index 62be86e..21d61ab 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileBuilderBoltTest.java @@ -147,7 +147,7 @@ public class ProfileBuilderBoltTest extends BaseBoltTest { ProfileBuilderBolt bolt = new ProfileBuilderBolt("zookeeperURL"); bolt.setCuratorFramework(client); - bolt.setTreeCache(cache); + bolt.setZKCache(cache); bolt.withPeriodDuration(10, TimeUnit.MINUTES); bolt.withProfileTimeToLive(30, TimeUnit.MINUTES); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java index d51401f..beab8d5 100644 --- a/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java +++ b/metron-analytics/metron-profiler/src/test/java/org/apache/metron/profiler/bolt/ProfileSplitterBoltTest.java @@ -140,7 +140,7 @@ public class ProfileSplitterBoltTest extends BaseBoltTest { ProfileSplitterBolt bolt = new ProfileSplitterBolt("zookeeperURL"); bolt.setCuratorFramework(client); - bolt.setTreeCache(cache); + bolt.setZKCache(cache); bolt.getConfigurations().updateProfilerConfig(profilerConfig.getBytes("UTF-8")); bolt.prepare(new HashMap<>(), topologyContext, outputCollector); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java index 1f72afb..6f4656e 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/ZookeeperConfig.java @@ -24,6 +24,8 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.zookeeper.ConfigurationsCache; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.metron.rest.MetronRestConstants; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -37,6 +39,15 @@ import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; public class ZookeeperConfig { @Bean(initMethod = "start", destroyMethod="close") + public ConfigurationsCache cache(CuratorFramework client) { + return new ZKConfigurationsCache( client + , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT + , ZKConfigurationsCache.ConfiguredTypes.PARSER + , ZKConfigurationsCache.ConfiguredTypes.INDEXING + ); + } + + @Bean(initMethod = "start", destroyMethod="close") public CuratorFramework client(Environment environment) { int sleepTime = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_SLEEP_TIME)); int maxRetries = Integer.parseInt(environment.getProperty(MetronRestConstants.CURATOR_MAX_RETRIES)); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java index e80380b..ed67994 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImpl.java @@ -17,27 +17,34 @@ */ package org.apache.metron.rest.service.impl; -import com.fasterxml.jackson.core.type.TypeReference; import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.GlobalConfigService; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.ByteArrayInputStream; import java.util.Map; @Service public class GlobalConfigServiceImpl implements GlobalConfigService { private CuratorFramework client; + private ConfigurationsCache cache; + @Autowired - public GlobalConfigServiceImpl(CuratorFramework client) { + public GlobalConfigServiceImpl(CuratorFramework client, ConfigurationsCache cache) { this.client = client; + this.cache = cache; + } + + public void setCache(ConfigurationsCache cache) { + this.cache = cache; } @Override @@ -52,16 +59,14 @@ public class GlobalConfigServiceImpl implements GlobalConfigService { @Override public Map<String, Object> get() throws RestException { - Map<String, Object> globalConfig; - try { - byte[] globalConfigBytes = ConfigurationsUtils.readGlobalConfigBytesFromZookeeper(client); - globalConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(globalConfigBytes), new TypeReference<Map<String, Object>>(){}); - } catch (KeeperException.NoNodeException e) { - return null; - } catch (Exception e) { - throw new RestException(e); - } - return globalConfig; + Map<String, Object> globalConfig; + try { + EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class); + globalConfig = configs.getGlobalConfig(false); + } catch (Exception e) { + throw new RestException(e.getMessage(), e); + } + return globalConfig; } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java index d4438a4..293b113 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImpl.java @@ -22,9 +22,12 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.aggregator.Aggregators; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorEnrichmentConfigService; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; @@ -43,10 +46,13 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig private CuratorFramework client; + private ConfigurationsCache cache; + @Autowired - public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) { + public SensorEnrichmentConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) { this.objectMapper = objectMapper; this.client = client; + this.cache = cache; } @Override @@ -61,38 +67,27 @@ public class SensorEnrichmentConfigServiceImpl implements SensorEnrichmentConfig @Override public SensorEnrichmentConfig findOne(String name) throws RestException { - SensorEnrichmentConfig sensorEnrichmentConfig; - try { - sensorEnrichmentConfig = ConfigurationsUtils.readSensorEnrichmentConfigFromZookeeper(name, client); - } catch (KeeperException.NoNodeException e) { - return null; - } catch (Exception e) { - throw new RestException(e); - } - return sensorEnrichmentConfig; + EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class); + return configs.getSensorEnrichmentConfig(name); } @Override public Map<String, SensorEnrichmentConfig> getAll() throws RestException { - Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>(); - List<String> sensorNames = getAllTypes(); - for (String name : sensorNames) { - sensorEnrichmentConfigs.put(name, findOne(name)); + Map<String, SensorEnrichmentConfig> sensorEnrichmentConfigs = new HashMap<>(); + List<String> sensorNames = getAllTypes(); + for (String name : sensorNames) { + SensorEnrichmentConfig config = findOne(name); + if(config != null) { + sensorEnrichmentConfigs.put(name, config); } - return sensorEnrichmentConfigs; + } + return sensorEnrichmentConfigs; } @Override public List<String> getAllTypes() throws RestException { - List<String> types; - try { - types = client.getChildren().forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot()); - } catch (KeeperException.NoNodeException e) { - types = new ArrayList<>(); - } catch (Exception e) { - throw new RestException(e); - } - return types; + EnrichmentConfigurations configs = cache.get( EnrichmentConfigurations.class); + return configs.getTypes(); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java index 9f984e0..5c73b26 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImpl.java @@ -17,20 +17,19 @@ */ package org.apache.metron.rest.service.impl; -import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; -import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorIndexingConfigService; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.io.ByteArrayInputStream; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,10 +41,13 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ private CuratorFramework client; + private ConfigurationsCache cache; + @Autowired - public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client) { + public SensorIndexingConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, ConfigurationsCache cache) { this.objectMapper = objectMapper; this.client = client; + this.cache = cache; } @Override @@ -60,16 +62,8 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ @Override public Map<String, Object> findOne(String name) throws RestException { - Map<String, Object> sensorIndexingConfig; - try { - byte[] sensorIndexingConfigBytes = ConfigurationsUtils.readSensorIndexingConfigBytesFromZookeeper(name, client); - sensorIndexingConfig = JSONUtils.INSTANCE.load(new ByteArrayInputStream(sensorIndexingConfigBytes), new TypeReference<Map<String, Object>>(){}); - } catch (KeeperException.NoNodeException e) { - return null; - } catch (Exception e) { - throw new RestException(e); - } - return sensorIndexingConfig; + IndexingConfigurations configs = cache.get( IndexingConfigurations.class); + return configs.getSensorIndexingConfig(name, false); } @Override @@ -77,22 +71,18 @@ public class SensorIndexingConfigServiceImpl implements SensorIndexingConfigServ Map<String, Map<String, Object>> sensorIndexingConfigs = new HashMap<>(); List<String> sensorNames = getAllTypes(); for (String name : sensorNames) { - sensorIndexingConfigs.put(name, findOne(name)); + Map<String, Object> config = findOne(name); + if(config != null) { + sensorIndexingConfigs.put(name, config); + } } return sensorIndexingConfigs; } @Override public List<String> getAllTypes() throws RestException { - List<String> types; - try { - types = client.getChildren().forPath(ConfigurationType.INDEXING.getZookeeperRoot()); - } catch (KeeperException.NoNodeException e) { - types = new ArrayList<>(); - } catch (Exception e) { - throw new RestException(e); - } - return types; + IndexingConfigurations configs = cache.get( IndexingConfigurations.class); + return configs.getTypes(); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java index f99b41c..7e70344 100644 --- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java +++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImpl.java @@ -29,13 +29,16 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.hadoop.fs.Path; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.parsers.interfaces.MessageParser; import org.apache.metron.rest.MetronRestConstants; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.ParseMessageRequest; import org.apache.metron.rest.service.GrokService; import org.apache.metron.rest.service.SensorParserConfigService; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.zookeeper.KeeperException; import org.json.simple.JSONObject; import org.reflections.Reflections; @@ -49,17 +52,21 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService private CuratorFramework client; + private ConfigurationsCache cache; + private GrokService grokService; + private Map<String, String> availableParsers; + @Autowired public SensorParserConfigServiceImpl(ObjectMapper objectMapper, CuratorFramework client, - GrokService grokService) { + GrokService grokService, ConfigurationsCache cache) { this.objectMapper = objectMapper; this.client = client; this.grokService = grokService; + this.cache = cache; } - private Map<String, String> availableParsers; @Override public SensorParserConfig save(SensorParserConfig sensorParserConfig) throws RestException { @@ -74,15 +81,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService @Override public SensorParserConfig findOne(String name) throws RestException { - SensorParserConfig sensorParserConfig; - try { - sensorParserConfig = ConfigurationsUtils.readSensorParserConfigFromZookeeper(name, client); - } catch (KeeperException.NoNodeException e) { - return null; - } catch (Exception e) { - throw new RestException(e); - } - return sensorParserConfig; + ParserConfigurations configs = cache.get( ParserConfigurations.class); + return configs.getSensorParserConfig(name); } @Override @@ -90,7 +90,10 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService List<SensorParserConfig> sensorParserConfigs = new ArrayList<>(); List<String> sensorNames = getAllTypes(); for (String name : sensorNames) { - sensorParserConfigs.add(findOne(name)); + SensorParserConfig config = findOne(name); + if(config != null) { + sensorParserConfigs.add(config); + } } return sensorParserConfigs; } @@ -109,15 +112,8 @@ public class SensorParserConfigServiceImpl implements SensorParserConfigService @Override public List<String> getAllTypes() throws RestException { - List<String> types; - try { - types = client.getChildren().forPath(ConfigurationType.PARSER.getZookeeperRoot()); - } catch (KeeperException.NoNodeException e) { - types = new ArrayList<>(); - } catch (Exception e) { - throw new RestException(e); - } - return types; + ParserConfigurations configs = cache.get( ParserConfigurations.class); + return configs.getTypes(); } @Override http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java index ea64fbe..1150189 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java @@ -36,6 +36,8 @@ import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.common.zookeeper.ConfigurationsCache; +import org.apache.metron.common.zookeeper.ZKConfigurationsCache; import org.apache.metron.hbase.mock.MockHBaseTableProvider; import org.apache.metron.integration.ComponentRunner; import org.apache.metron.integration.UnableToStartException; @@ -75,6 +77,15 @@ public class TestConfig { return new KafkaComponent().withTopologyProperties(zkProperties); } + @Bean(initMethod = "start", destroyMethod="close") + public ConfigurationsCache cache(CuratorFramework client) { + return new ZKConfigurationsCache( client + , ZKConfigurationsCache.ConfiguredTypes.ENRICHMENT + , ZKConfigurationsCache.ConfiguredTypes.PARSER + , ZKConfigurationsCache.ConfiguredTypes.INDEXING + ); + } + @Bean(destroyMethod = "stop") public ComponentRunner componentRunner(ZKServerComponent zkServerComponent, KafkaComponent kafkaWithZKComponent) { ComponentRunner runner = new ComponentRunner.Builder() http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java index f4e18ea..abb75b1 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/GlobalConfigControllerIntegrationTest.java @@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; +import static org.apache.metron.integration.utils.TestUtils.assertEventually; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; @@ -97,9 +98,10 @@ public class GlobalConfigControllerIntegrationTest { .andExpect(status().isCreated()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))); - this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson)) + assertEventually(() -> this.mockMvc.perform(post(globalConfigUrl).with(httpBasic(user,password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(globalJson)) .andExpect(status().isOk()) - .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))); + .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) + ); this.mockMvc.perform(get(globalConfigUrl).with(httpBasic(user,password))) .andExpect(status().isOk()); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java index dd4eff7..15a2370 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorEnrichmentConfigControllerIntegrationTest.java @@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; +import static org.apache.metron.integration.utils.TestUtils.assertEventually; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; @@ -167,7 +168,7 @@ public class SensorEnrichmentConfigControllerIntegrationTest { .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10)) .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX")); - this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) + assertEventually(() -> this.mockMvc.perform(post(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) .andExpect(jsonPath("$.enrichment.fieldMap.geo[0]").value("ip_dst_addr")) @@ -183,7 +184,8 @@ public class SensorEnrichmentConfigControllerIntegrationTest { .andExpect(jsonPath("$.threatIntel.fieldToTypeMap.ip_dst_addr[0]").value("malicious_ip")) .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].rule").value("ip_src_addr == '10.122.196.204' or ip_dst_addr == '10.122.196.204'")) .andExpect(jsonPath("$.threatIntel.triageConfig.riskLevelRules[0].score").value(10)) - .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX")); + .andExpect(jsonPath("$.threatIntel.triageConfig.aggregator").value("MAX") ) + ); this.mockMvc.perform(get(sensorEnrichmentConfigUrl + "/broTest").with(httpBasic(user,password))) .andExpect(status().isOk()) http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java index cebcde6..674c55a 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorIndexingConfigControllerIntegrationTest.java @@ -31,6 +31,7 @@ import org.springframework.test.web.servlet.MockMvc; import org.springframework.test.web.servlet.setup.MockMvcBuilders; import org.springframework.web.context.WebApplicationContext; +import static org.apache.metron.integration.utils.TestUtils.assertEventually; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.httpBasic; @@ -103,11 +104,12 @@ public class SensorIndexingConfigControllerIntegrationTest { .andExpect(jsonPath("$.index").value("broTest")) .andExpect(jsonPath("$.batchSize").value(1)); - this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) + assertEventually(() -> this.mockMvc.perform(post(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) .andExpect(jsonPath("$.index").value("broTest")) - .andExpect(jsonPath("$.batchSize").value(1)); + .andExpect(jsonPath("$.batchSize").value(1)) + ); this.mockMvc.perform(get(sensorIndexingConfigUrl + "/broTest").with(httpBasic(user,password))) .andExpect(status().isOk()) http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java index 6e2d788..d8aea72 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/SensorParserConfigControllerIntegrationTest.java @@ -38,7 +38,9 @@ import org.springframework.web.context.WebApplicationContext; import java.io.File; import java.io.IOException; import java.lang.reflect.Method; +import java.util.concurrent.atomic.AtomicInteger; +import static org.apache.metron.integration.utils.TestUtils.assertEventually; import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE; import static org.hamcrest.Matchers.hasSize; import static org.springframework.security.test.web.servlet.request.SecurityMockMvcRequestPostProcessors.csrf; @@ -198,16 +200,16 @@ public class SensorParserConfigControllerIntegrationTest { this.sensorParserConfigService.delete("broTest"); this.sensorParserConfigService.delete("squidTest"); Method[] method = SensorParserConfig.class.getMethods(); - int numFields = 0; + final AtomicInteger numFields = new AtomicInteger(0); for(Method m : method) { if(m.getName().startsWith("set")) { - numFields++; + numFields.set(numFields.get() + 1); } } this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(squidJson)) .andExpect(status().isCreated()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(numFields))) + .andExpect(jsonPath("$.*", hasSize(numFields.get()))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser")) .andExpect(jsonPath("$.sensorTopic").value("squidTest")) .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest")) @@ -219,10 +221,10 @@ public class SensorParserConfigControllerIntegrationTest { .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)")) .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)")); - this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password))) + assertEventually(() -> this.mockMvc.perform(get(sensorParserConfigUrl + "/squidTest").with(httpBasic(user,password))) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(numFields))) + .andExpect(jsonPath("$.*", hasSize(numFields.get()))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.GrokParser")) .andExpect(jsonPath("$.sensorTopic").value("squidTest")) .andExpect(jsonPath("$.parserConfig.grokPath").value("target/patterns/squidTest")) @@ -232,7 +234,8 @@ public class SensorParserConfigControllerIntegrationTest { .andExpect(jsonPath("$.fieldTransformations[0].output[0]").value("full_hostname")) .andExpect(jsonPath("$.fieldTransformations[0].output[1]").value("domain_without_subdomains")) .andExpect(jsonPath("$.fieldTransformations[0].config.full_hostname").value("URL_TO_HOST(url)")) - .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)")); + .andExpect(jsonPath("$.fieldTransformations[0].config.domain_without_subdomains").value("DOMAIN_REMOVE_SUBDOMAINS(full_hostname)")) + ); this.mockMvc.perform(get(sensorParserConfigUrl).with(httpBasic(user,password))) .andExpect(status().isOk()) @@ -251,7 +254,7 @@ public class SensorParserConfigControllerIntegrationTest { this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isCreated()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(numFields))) + .andExpect(jsonPath("$.*", hasSize(numFields.get()))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser")) .andExpect(jsonPath("$.sensorTopic").value("broTest")) .andExpect(jsonPath("$.readMetadata").value("true")) @@ -261,7 +264,7 @@ public class SensorParserConfigControllerIntegrationTest { this.mockMvc.perform(post(sensorParserConfigUrl).with(httpBasic(user, password)).with(csrf()).contentType(MediaType.parseMediaType("application/json;charset=UTF-8")).content(broJson)) .andExpect(status().isOk()) .andExpect(content().contentType(MediaType.parseMediaType("application/json;charset=UTF-8"))) - .andExpect(jsonPath("$.*", hasSize(numFields))) + .andExpect(jsonPath("$.*", hasSize(numFields.get()))) .andExpect(jsonPath("$.parserClassName").value("org.apache.metron.parsers.bro.BasicBroParser")) .andExpect(jsonPath("$.sensorTopic").value("broTest")) .andExpect(jsonPath("$.readMetadata").value("true")) http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java index 5c6dd12..e3518ca 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/controller/StormControllerIntegrationTest.java @@ -18,9 +18,11 @@ package org.apache.metron.rest.controller; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.integration.utils.TestUtils; import org.apache.metron.rest.model.TopologyStatusCode; import org.apache.metron.rest.service.GlobalConfigService; import org.apache.metron.rest.service.SensorParserConfigService; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -169,6 +171,11 @@ public class StormControllerIntegrationTest { .andExpect(jsonPath("$.message").value(TopologyStatusCode.GLOBAL_CONFIG_MISSING.name())); globalConfigService.save(globalConfig); + { + final Map<String, Object> expectedGlobalConfig = globalConfig; + //we must wait for the config to find its way into the config. + TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get())); + } this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password))) .andExpect(status().isOk()) @@ -179,6 +186,11 @@ public class StormControllerIntegrationTest { sensorParserConfig.setParserClassName("org.apache.metron.parsers.bro.BasicBroParser"); sensorParserConfig.setSensorTopic("broTest"); sensorParserConfigService.save(sensorParserConfig); + { + final Map<String, Object> expectedGlobalConfig = globalConfig; + //we must wait for the config to find its way into the config. + TestUtils.assertEventually(() -> Assert.assertEquals(expectedGlobalConfig, globalConfigService.get())); + } this.mockMvc.perform(get(stormUrl + "/parser/start/broTest").with(httpBasic(user,password))) .andExpect(status().isOk()) http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java index 824fb4b..85a66b3 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/GlobalConfigServiceImplTest.java @@ -28,11 +28,15 @@ import static org.mockito.Mockito.when; import java.util.HashMap; import java.util.Map; + +import com.google.common.collect.ImmutableMap; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.api.SetDataBuilder; import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.GlobalConfigService; import org.apache.zookeeper.KeeperException; @@ -49,11 +53,13 @@ public class GlobalConfigServiceImplTest { CuratorFramework curatorFramework; GlobalConfigService globalConfigService; + ConfigurationsCache cache; @Before public void setUp() throws Exception { curatorFramework = mock(CuratorFramework.class); - globalConfigService = new GlobalConfigServiceImpl(curatorFramework); + cache = mock(ConfigurationsCache.class); + globalConfigService = new GlobalConfigServiceImpl(curatorFramework, cache); } @@ -98,25 +104,19 @@ public class GlobalConfigServiceImplTest { put("k", "v"); }}; - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenReturn(config.getBytes()); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); + EnrichmentConfigurations configs = new EnrichmentConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(ConfigurationType.GLOBAL.getTypeName(), configMap); + } + }; + when(cache.get( eq(EnrichmentConfigurations.class))) + .thenReturn(configs); assertEquals(configMap, globalConfigService.get()); } @Test - public void getShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.GLOBAL.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - assertNull(globalConfigService.get()); - } - - @Test public void getShouldWrapNonNoNodeExceptionInRestException() throws Exception { exception.expect(RestException.class); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java index c26a210..0a78f4a 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorEnrichmentConfigServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.metron.rest.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.DeleteBuilder; @@ -25,9 +26,11 @@ import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.api.SetDataBuilder; import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.EnrichmentConfigurations; import org.apache.metron.common.configuration.enrichment.EnrichmentConfig; import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig; import org.apache.metron.common.configuration.enrichment.threatintel.ThreatIntelConfig; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorEnrichmentConfigService; import org.apache.zookeeper.KeeperException; @@ -40,6 +43,7 @@ import org.junit.rules.ExpectedException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.Map; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -79,11 +83,14 @@ public class SensorEnrichmentConfigServiceImplTest { @Multiline public static String broJson; + ConfigurationsCache cache; + @Before public void setUp() throws Exception { objectMapper = mock(ObjectMapper.class); curatorFramework = mock(CuratorFramework.class); - sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework); + cache = mock(ConfigurationsCache.class); + sensorEnrichmentConfigService = new SensorEnrichmentConfigServiceImpl(objectMapper, curatorFramework, cache); } @@ -125,84 +132,54 @@ public class SensorEnrichmentConfigServiceImplTest { public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception { final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig(); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + EnrichmentConfigurations configs = new EnrichmentConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig); + } + }; + when(cache.get(eq(EnrichmentConfigurations.class))) + .thenReturn(configs); + //We only have bro, so we should expect it to be returned assertEquals(getTestSensorEnrichmentConfig(), sensorEnrichmentConfigService.findOne("bro")); - } - - @Test - public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - assertNull(sensorEnrichmentConfigService.findOne("bro")); - } - - @Test - public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenThrow(Exception.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - sensorEnrichmentConfigService.findOne("bro"); + //and blah should be a miss. + assertNull(sensorEnrichmentConfigService.findOne("blah")); } @Test public void getAllTypesShouldProperlyReturnTypes() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - add("squid"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); + + EnrichmentConfigurations configs = new EnrichmentConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), new HashMap<>() + ,EnrichmentConfigurations.getKey("squid"), new HashMap<>() + ); + } + }; + when(cache.get(eq(EnrichmentConfigurations.class))) + .thenReturn(configs); assertEquals(new ArrayList() {{ add("bro"); add("squid"); }}, sensorEnrichmentConfigService.getAllTypes()); - } - @Test - public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - assertEquals(new ArrayList<>(), sensorEnrichmentConfigService.getAllTypes()); } - @Test - public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())).thenThrow(Exception.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - sensorEnrichmentConfigService.getAllTypes(); - } @Test public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - final SensorEnrichmentConfig sensorEnrichmentConfig = getTestSensorEnrichmentConfig(); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.ENRICHMENT.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + EnrichmentConfigurations configs = new EnrichmentConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(EnrichmentConfigurations.getKey("bro"), sensorEnrichmentConfig); + } + }; + when(cache.get( eq(EnrichmentConfigurations.class))) + .thenReturn(configs); assertEquals(new HashMap() {{ put("bro", sensorEnrichmentConfig);}}, sensorEnrichmentConfigService.getAll()); } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java index 43ca0f7..9641a52 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorIndexingConfigServiceImplTest.java @@ -17,7 +17,10 @@ */ package org.apache.metron.rest.service.impl; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.DeleteBuilder; @@ -25,6 +28,9 @@ import org.apache.curator.framework.api.GetChildrenBuilder; import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.api.SetDataBuilder; import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.service.SensorIndexingConfigService; import org.apache.zookeeper.KeeperException; @@ -36,6 +42,7 @@ import org.junit.rules.ExpectedException; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -55,6 +62,7 @@ public class SensorIndexingConfigServiceImplTest { ObjectMapper objectMapper; CuratorFramework curatorFramework; SensorIndexingConfigService sensorIndexingConfigService; + ConfigurationsCache cache; /** { @@ -72,7 +80,8 @@ public class SensorIndexingConfigServiceImplTest { public void setUp() throws Exception { objectMapper = mock(ObjectMapper.class); curatorFramework = mock(CuratorFramework.class); - sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework); + cache = mock(ConfigurationsCache.class); + sensorIndexingConfigService = new SensorIndexingConfigServiceImpl(objectMapper, curatorFramework, cache); } @@ -114,44 +123,36 @@ public class SensorIndexingConfigServiceImplTest { public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception { final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig(); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + IndexingConfigurations configs = new IndexingConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig); + } + }; + when(cache.get( eq(IndexingConfigurations.class))) + .thenReturn(configs); + //We only have bro, so we should expect it to be returned assertEquals(getTestSensorIndexingConfig(), sensorIndexingConfigService.findOne("bro")); + //and blah should be a miss. + assertNull(sensorIndexingConfigService.findOne("blah")); } - @Test - public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - assertNull(sensorIndexingConfigService.findOne("bro")); - } - - @Test - public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenThrow(Exception.class); - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - sensorIndexingConfigService.findOne("bro"); - } @Test public void getAllTypesShouldProperlyReturnTypes() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - add("squid"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); + IndexingConfigurations configs = new IndexingConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(IndexingConfigurations.getKey("bro"), new HashMap<>() + ,IndexingConfigurations.getKey("squid"), new HashMap<>() + ); + } + }; + when(cache.get(eq(IndexingConfigurations.class))) + .thenReturn(configs); assertEquals(new ArrayList() {{ add("bro"); @@ -159,39 +160,18 @@ public class SensorIndexingConfigServiceImplTest { }}, sensorIndexingConfigService.getAllTypes()); } - @Test - public void getAllTypesShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - assertEquals(new ArrayList<>(), sensorIndexingConfigService.getAllTypes()); - } @Test - public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())).thenThrow(Exception.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - sensorIndexingConfigService.getAllTypes(); - } - - @Test - public void getAllShouldProperlyReturnSensorEnrichmentConfigs() throws Exception { + public void getAllShouldProperlyReturnIndexingConfigs() throws Exception { final Map<String, Object> sensorIndexingConfig = getTestSensorIndexingConfig(); - - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.INDEXING.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + IndexingConfigurations configs = new IndexingConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(IndexingConfigurations.getKey("bro"), sensorIndexingConfig ); + } + }; + when(cache.get(eq(IndexingConfigurations.class))) + .thenReturn(configs); assertEquals(new HashMap() {{ put("bro", sensorIndexingConfig);}}, sensorIndexingConfigService.getAll()); } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java ---------------------------------------------------------------------- diff --git a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java index c96a796..7998c21 100644 --- a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java +++ b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/SensorParserConfigServiceImplTest.java @@ -18,6 +18,7 @@ package org.apache.metron.rest.service.impl; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableMap; import oi.thekraken.grok.api.Grok; import org.adrianwalker.multilinestring.Multiline; import org.apache.curator.framework.CuratorFramework; @@ -27,7 +28,9 @@ import org.apache.curator.framework.api.GetDataBuilder; import org.apache.curator.framework.api.SetDataBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.metron.common.configuration.ConfigurationType; +import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.zookeeper.ConfigurationsCache; import org.apache.metron.rest.RestException; import org.apache.metron.rest.model.ParseMessageRequest; import org.apache.metron.rest.service.GrokService; @@ -95,6 +98,8 @@ public class SensorParserConfigServiceImplTest { private String user = "user1"; + ConfigurationsCache cache; + @Before public void setUp() throws Exception { objectMapper = mock(ObjectMapper.class); @@ -105,7 +110,8 @@ public class SensorParserConfigServiceImplTest { SecurityContextHolder.getContext().setAuthentication(authentication); when(environment.getProperty(GROK_TEMP_PATH_SPRING_PROPERTY)).thenReturn("./target"); grokService = new GrokServiceImpl(environment, mock(Grok.class), new HdfsServiceImpl(new Configuration())); - sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService); + cache = mock(ConfigurationsCache.class); + sensorParserConfigService = new SensorParserConfigServiceImpl(objectMapper, curatorFramework, grokService, cache); } @@ -144,47 +150,36 @@ public class SensorParserConfigServiceImplTest { } @Test - public void findOneShouldProperlyReturnSensorEnrichmentConfig() throws Exception { + public void findOneShouldProperlyReturnSensorParserConfig() throws Exception { final SensorParserConfig sensorParserConfig = getTestBroSensorParserConfig(); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + ParserConfigurations configs = new ParserConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(ParserConfigurations.getKey("bro"), sensorParserConfig); + } + }; + when(cache.get(eq(ParserConfigurations.class))) + .thenReturn(configs); + //We only have bro, so we should expect it to be returned assertEquals(getTestBroSensorParserConfig(), sensorParserConfigService.findOne("bro")); - } - - @Test - public void findOneShouldReturnNullWhenNoNodeExceptionIsThrown() throws Exception { - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(KeeperException.NoNodeException.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - assertNull(sensorParserConfigService.findOne("bro")); - } - - @Test - public void findOneShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenThrow(Exception.class); - - when(curatorFramework.getData()).thenReturn(getDataBuilder); - - sensorParserConfigService.findOne("bro"); + //and blah should be a miss. + assertNull(sensorParserConfigService.findOne("blah")); } @Test public void getAllTypesShouldProperlyReturnTypes() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - add("squid"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); + ParserConfigurations configs = new ParserConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(ParserConfigurations.getKey("bro"), new HashMap<>() + ,ParserConfigurations.getKey("squid"), new HashMap<>() + ); + } + }; + when(cache.get( eq(ParserConfigurations.class))) + .thenReturn(configs); assertEquals(new ArrayList() {{ add("bro"); @@ -193,41 +188,19 @@ public class SensorParserConfigServiceImplTest { } @Test - public void getAllTypesShouldReturnEmptyListWhenNoNodeExceptionIsThrown() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(KeeperException.NoNodeException.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - assertEquals(new ArrayList<>(), sensorParserConfigService.getAllTypes()); - } - - @Test - public void getAllTypesShouldWrapNonNoNodeExceptionInRestException() throws Exception { - exception.expect(RestException.class); - - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())).thenThrow(Exception.class); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - - sensorParserConfigService.getAllTypes(); - } - - @Test public void getAllShouldProperlyReturnSensorParserConfigs() throws Exception { - GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class); - when(getChildrenBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot())) - .thenReturn(new ArrayList() {{ - add("bro"); - add("squid"); - }}); - when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder); - final SensorParserConfig broSensorParserConfig = getTestBroSensorParserConfig(); final SensorParserConfig squidSensorParserConfig = getTestSquidSensorParserConfig(); - GetDataBuilder getDataBuilder = mock(GetDataBuilder.class); - when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/bro")).thenReturn(broJson.getBytes()); - when(getDataBuilder.forPath(ConfigurationType.PARSER.getZookeeperRoot() + "/squid")).thenReturn(squidJson.getBytes()); - when(curatorFramework.getData()).thenReturn(getDataBuilder); + ParserConfigurations configs = new ParserConfigurations(){ + @Override + public Map<String, Object> getConfigurations() { + return ImmutableMap.of(ParserConfigurations.getKey("bro"), broSensorParserConfig + ,ParserConfigurations.getKey("squid"), squidSensorParserConfig + ); + } + }; + when(cache.get( eq(ParserConfigurations.class))) + .thenReturn(configs); assertEquals(new ArrayList() {{ add(getTestBroSensorParserConfig()); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml index 3054881..8734d63 100644 --- a/metron-platform/metron-common/pom.xml +++ b/metron-platform/metron-common/pom.xml @@ -54,6 +54,11 @@ </dependency> <dependency> <groupId>org.apache.metron</groupId> + <artifactId>metron-zookeeper</artifactId> + <version>${project.parent.version}</version> + </dependency> + <dependency> + <groupId>org.apache.metron</groupId> <artifactId>metron-integration-test</artifactId> <version>${project.parent.version}</version> <scope>test</scope> @@ -289,11 +294,6 @@ <version>${global_jackson_version}</version> </dependency> <dependency> - <groupId>org.apache.curator</groupId> - <artifactId>curator-recipes</artifactId> - <version>${global_curator_version}</version> - </dependency> - <dependency> <groupId>org.apache.storm</groupId> <artifactId>flux-core</artifactId> <version>${global_flux_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java index a97091a..6f15746 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredBolt.java @@ -17,54 +17,58 @@ */ package org.apache.metron.common.bolt; -import java.io.IOException; import java.lang.invoke.MethodHandles; import java.util.Map; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.cache.TreeCache; import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.metron.common.Constants; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.Configurations; import org.apache.metron.common.configuration.ConfigurationsUtils; +import org.apache.metron.zookeeper.SimpleEventListener; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.Reloadable; +import org.apache.metron.zookeeper.ZKCache; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.base.BaseRichBolt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt { +public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends BaseRichBolt implements Reloadable { private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private String zookeeperUrl; protected CuratorFramework client; - protected TreeCache cache; - private final CONFIG_T configurations = defaultConfigurations(); + protected ZKCache cache; + private final CONFIG_T configurations; public ConfiguredBolt(String zookeeperUrl) { this.zookeeperUrl = zookeeperUrl; + this.configurations = createUpdater().defaultConfigurations(); } public void setCuratorFramework(CuratorFramework client) { this.client = client; } - public void setTreeCache(TreeCache cache) { + public void setZKCache(ZKCache cache) { this.cache = cache; } + @Override public void reloadCallback(String name, ConfigurationType type) { } + public CONFIG_T getConfigurations() { return configurations; } - protected abstract CONFIG_T defaultConfigurations(); + protected abstract ConfigurationsUpdater<CONFIG_T> createUpdater(); @Override @@ -85,30 +89,30 @@ public abstract class ConfiguredBolt<CONFIG_T extends Configurations> extends Ba //zookeeper. ConfigurationsUtils.setupStellarStatically(client); if (cache == null) { - cache = new TreeCache(client, Constants.ZOOKEEPER_TOPOLOGY_ROOT); - TreeCacheListener listener = new TreeCacheListener() { - @Override - public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { - if (event.getType().equals(TreeCacheEvent.Type.NODE_ADDED) || event.getType().equals(TreeCacheEvent.Type.NODE_UPDATED)) { - String path = event.getData().getPath(); - byte[] data = event.getData().getData(); - updateConfig(path, data); - } - } - }; - cache.getListenable().addListener(listener); - loadConfig(); + ConfigurationsUpdater<CONFIG_T> updater = createUpdater(); + SimpleEventListener listener = new SimpleEventListener.Builder() + .with( updater::update + , TreeCacheEvent.Type.NODE_ADDED + , TreeCacheEvent.Type.NODE_UPDATED + ) + .with( updater::delete + , TreeCacheEvent.Type.NODE_REMOVED + ) + .build(); + cache = new ZKCache.Builder() + .withClient(client) + .withListener(listener) + .withRoot(Constants.ZOOKEEPER_TOPOLOGY_ROOT) + .build(); + updater.forceUpdate(client); + cache.start(); } - cache.start(); } catch (Exception e) { LOG.error(e.getMessage(), e); throw new RuntimeException(e); } } - abstract public void loadConfig(); - abstract public void updateConfig(String path, byte[] data) throws IOException; - @Override public void cleanup() { cache.close(); http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java index 9c3ee97..54fd7e8 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredEnrichmentBolt.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.EnrichmentConfigurations; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.EnrichmentUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,31 +37,7 @@ public abstract class ConfiguredEnrichmentBolt extends ConfiguredBolt<Enrichment } @Override - protected EnrichmentConfigurations defaultConfigurations() { - return new EnrichmentConfigurations(); - } - - @Override - public void loadConfig() { - try { - - ConfigurationsUtils.updateEnrichmentConfigsFromZookeeper(getConfigurations(), client); - } catch (Exception e) { - LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily..."); - } - } - - @Override - public void updateConfig(String path, byte[] data) throws IOException { - if (data.length != 0) { - String name = path.substring(path.lastIndexOf("/") + 1); - if (path.startsWith(ConfigurationType.ENRICHMENT.getZookeeperRoot())) { - getConfigurations().updateSensorEnrichmentConfig(name, data); - reloadCallback(name, ConfigurationType.ENRICHMENT); - } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - getConfigurations().updateGlobalConfig(data); - reloadCallback(name, ConfigurationType.GLOBAL); - } - } + protected ConfigurationsUpdater<EnrichmentConfigurations> createUpdater() { + return new EnrichmentUpdater(this, this::getConfigurations); } } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java index cddcada..09300e4 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredIndexingBolt.java @@ -22,6 +22,8 @@ import java.lang.invoke.MethodHandles; import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.IndexingConfigurations; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.IndexingUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,30 +35,8 @@ public abstract class ConfiguredIndexingBolt extends ConfiguredBolt<IndexingConf } @Override - protected IndexingConfigurations defaultConfigurations() { - return new IndexingConfigurations(); + protected ConfigurationsUpdater<IndexingConfigurations> createUpdater() { + return new IndexingUpdater(this, this::getConfigurations); } - @Override - public void loadConfig() { - try { - ConfigurationsUtils.updateSensorIndexingConfigsFromZookeeper(getConfigurations(), client); - } catch (Exception e) { - LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily..."); - } - } - - @Override - public void updateConfig(String path, byte[] data) throws IOException { - if (data.length != 0) { - String name = path.substring(path.lastIndexOf("/") + 1); - if (path.startsWith(ConfigurationType.INDEXING.getZookeeperRoot())) { - getConfigurations().updateSensorIndexingConfig(name, data); - reloadCallback(name, ConfigurationType.INDEXING); - } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - getConfigurations().updateGlobalConfig(data); - reloadCallback(name, ConfigurationType.GLOBAL); - } - } - } } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java index 99313fa..2f13658 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredParserBolt.java @@ -23,6 +23,8 @@ import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.ConfigurationsUtils; import org.apache.metron.common.configuration.ParserConfigurations; import org.apache.metron.common.configuration.SensorParserConfig; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.ParserUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,34 +43,14 @@ public abstract class ConfiguredParserBolt extends ConfiguredBolt<ParserConfigur return getConfigurations().getSensorParserConfig(sensorType); } - @Override - protected ParserConfigurations defaultConfigurations() { - return new ParserConfigurations(); - } - public String getSensorType() { return sensorType; } - @Override - public void loadConfig() { - try { - ConfigurationsUtils.updateParserConfigsFromZookeeper(getConfigurations(), client); - } catch (Exception e) { - LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily..."); - } - } + @Override - public void updateConfig(String path, byte[] data) throws IOException { - if (data.length != 0) { - String name = path.substring(path.lastIndexOf("/") + 1); - if (path.startsWith(ConfigurationType.PARSER.getZookeeperRoot())) { - getConfigurations().updateSensorParserConfig(name, data); - reloadCallback(name, ConfigurationType.PARSER); - } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - getConfigurations().updateGlobalConfig(data); - reloadCallback(name, ConfigurationType.GLOBAL); - } - } + protected ConfigurationsUpdater<ParserConfigurations> createUpdater() { + return new ParserUpdater(this, this::getConfigurations); } + } http://git-wip-us.apache.org/repos/asf/metron/blob/cc111ec9/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java ---------------------------------------------------------------------- diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java index 22ff3a9..90575d0 100644 --- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java +++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/bolt/ConfiguredProfilerBolt.java @@ -17,16 +17,12 @@ */ package org.apache.metron.common.bolt; -import static org.apache.metron.common.configuration.ConfigurationType.PROFILER; -import java.io.ByteArrayInputStream; -import java.io.IOException; import java.lang.invoke.MethodHandles; -import org.apache.curator.framework.CuratorFramework; -import org.apache.metron.common.configuration.ConfigurationType; import org.apache.metron.common.configuration.profiler.ProfilerConfig; import org.apache.metron.common.configuration.profiler.ProfilerConfigurations; -import org.apache.metron.common.utils.JSONUtils; +import org.apache.metron.common.zookeeper.configurations.ConfigurationsUpdater; +import org.apache.metron.common.zookeeper.configurations.ProfilerUpdater; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,43 +42,8 @@ public abstract class ConfiguredProfilerBolt extends ConfiguredBolt<ProfilerConf } @Override - protected ProfilerConfigurations defaultConfigurations() { - return new ProfilerConfigurations(); + protected ConfigurationsUpdater<ProfilerConfigurations> createUpdater() { + return new ProfilerUpdater(this, this::getConfigurations); } - @Override - public void loadConfig() { - try { - ProfilerConfig config = readFromZookeeper(client); - if(config != null) { - getConfigurations().updateProfilerConfig(config); - } - - } catch (Exception e) { - LOG.warn("Unable to load configs from zookeeper, but the cache should load lazily..."); - } - } - - private ProfilerConfig readFromZookeeper(CuratorFramework client) throws Exception { - byte[] raw = client.getData().forPath(PROFILER.getZookeeperRoot()); - return JSONUtils.INSTANCE.load(new ByteArrayInputStream(raw), ProfilerConfig.class); - } - - @Override - public void updateConfig(String path, byte[] data) throws IOException { - if (data.length != 0) { - String name = path.substring(path.lastIndexOf("/") + 1); - - // update the profiler configuration from zookeeper - if (path.startsWith(ConfigurationType.PROFILER.getZookeeperRoot())) { - getConfigurations().updateProfilerConfig(data); - reloadCallback(name, ConfigurationType.PROFILER); - - // update the global configuration from zookeeper - } else if (ConfigurationType.GLOBAL.getZookeeperRoot().equals(path)) { - getConfigurations().updateGlobalConfig(data); - reloadCallback(name, ConfigurationType.GLOBAL); - } - } - } }
