This is an automated email from the ASF dual-hosted git repository.
mmiklavcic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/master by this push:
new 3c13173 METRON-2141 Cache REST API status update calls to the Storm
UI (mmiklavc) closes apache/metron#1439
3c13173 is described below
commit 3c1317360243398eef8097edcea5f7c1a0bb2f59
Author: mmiklavc <[email protected]>
AuthorDate: Wed Jun 12 17:13:16 2019 -0600
METRON-2141 Cache REST API status update calls to the Storm UI (mmiklavc)
closes apache/metron#1439
---
.../CURRENT/configuration/metron-rest-env.xml | 13 +-
.../CURRENT/package/scripts/params/params_linux.py | 2 +
.../METRON/CURRENT/package/templates/metron.j2 | 3 +
.../METRON/CURRENT/themes/metron_theme.json | 20 +++
.../apache/metron/rest/model/TopologySummary.java | 11 +-
metron-interface/metron-rest/README.md | 24 ++--
.../src/main/config/rest_application.yml | 4 +
.../apache/metron/rest/MetronRestConstants.java | 2 +
.../org/apache/metron/rest/config/StormConfig.java | 21 ++-
.../service/impl/CachedStormStatusServiceImpl.java | 127 +++++++++++++++++
.../rest/service/impl/StormStatusServiceImpl.java | 14 +-
.../org/apache/metron/rest/config/TestConfig.java | 12 ++
.../impl/CachedStormStatusServiceImplTest.java | 158 +++++++++++++++++++++
13 files changed, 385 insertions(+), 26 deletions(-)
diff --git
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
index 68b1140..145b64e 100644
---
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
+++
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-rest-env.xml
@@ -181,5 +181,16 @@
<description>The field name where the threat triage score can be found
in the search indices. This setting primarily affects the Alerts
UI.</description>
<value>threat:triage:score</value>
</property>
-
+ <property>
+ <name>storm_status_cache_max_size</name>
+ <value>10000</value>
+ <description>The maximum size for the cache that fronts calls to the
Storm API for topology status.</description>
+ <display-name>Storm Status Cache Max Size</display-name>
+ </property>
+ <property>
+ <name>storm_status_cache_timeout_seconds</name>
+ <value>5</value>
+ <description>Duration in seconds for cache entries to timeout. Note
that the higher the value, the more stale the returned value will
be.</description>
+ <display-name>Storm Status Cache Timeout Seconds</display-name>
+ </property>
</configuration>
diff --git
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index de6b8bc..a7f20fc 100755
---
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -64,6 +64,8 @@ metron_alerts_ui_host = status_params.metron_alerts_ui_host
metron_alerts_ui_port = status_params.metron_alerts_ui_port
metron_alerts_ui_path = metron_home + '/web/alerts-ui/'
metron_jvm_flags =
config['configurations']['metron-rest-env']['metron_jvm_flags']
+storm_status_cache_max_size =
config['configurations']['metron-rest-env']['storm_status_cache_max_size']
+storm_status_cache_timeout_seconds =
config['configurations']['metron-rest-env']['storm_status_cache_timeout_seconds']
# Construct the profiles as a temp variable first. Only the first time it's
set will carry through
metron_spring_profiles_active =
config['configurations']['metron-rest-env']['metron_spring_profiles_active']
diff --git
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
index 936118c..5c43bbd 100644
---
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
+++
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/templates/metron.j2
@@ -66,3 +66,6 @@ PCAP_FINAL_OUTPUT_PATH="{{pcap_final_output_path}}"
PCAP_PAGE_SIZE="{{pcap_page_size}}"
PCAP_YARN_QUEUE="{{pcap_yarn_queue}}"
PCAP_FINALIZER_THREADPOOL_SIZE="{{pcap_finalizer_threadpool_size}}"
+STORM_STATUS_CACHE_MAX_SIZE="{{storm_status_cache_max_size}}"
+STORM_STATUS_CACHE_TIMEOUT_SECONDS="{{storm_status_cache_timeout_seconds}}"
+
diff --git
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
index 69084e3..a9b7322 100644
---
a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
+++
b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/themes/metron_theme.json
@@ -915,6 +915,14 @@
"subsection-name": "subsection-rest"
},
{
+ "config": "metron-rest-env/storm_status_cache_max_size",
+ "subsection-name": "subsection-rest"
+ },
+ {
+ "config": "metron-rest-env/storm_status_cache_timeout_seconds",
+ "subsection-name": "subsection-rest"
+ },
+ {
"config": "metron-management-ui-env/metron_management_ui_port",
"subsection-name": "subsection-management-ui"
},
@@ -1648,6 +1656,18 @@
}
},
{
+ "config": "metron-rest-env/storm_status_cache_max_size",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
+ "config": "metron-rest-env/storm_status_cache_timeout_seconds",
+ "widget": {
+ "type": "text-field"
+ }
+ },
+ {
"config": "metron-pcap-env/pcap_page_size",
"widget": {
"type": "text-field"
diff --git
a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java
b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java
index 8621daf..5bd854b 100644
---
a/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java
+++
b/metron-interface/metron-rest-client/src/main/java/org/apache/metron/rest/model/TopologySummary.java
@@ -21,9 +21,16 @@ import java.util.Arrays;
public class TopologySummary {
- private TopologyStatus[] topologies;
+ private TopologyStatus[] topologies;
- public TopologyStatus[] getTopologies() {
+ public TopologySummary() {
+ }
+
+ public TopologySummary(TopologyStatus[] topologies) {
+ this.topologies = topologies;
+ }
+
+ public TopologyStatus[] getTopologies() {
if (topologies == null) {
return new TopologyStatus[0];
}
diff --git a/metron-interface/metron-rest/README.md
b/metron-interface/metron-rest/README.md
index c76a402..aa28260 100644
--- a/metron-interface/metron-rest/README.md
+++ b/metron-interface/metron-rest/README.md
@@ -79,17 +79,19 @@ No optional parameter has a default.
| HDFS_URL | HDFS url or `fs.defaultFS` Hadoop
setting (ex. hdfs://node1:8020)
### Optional - With Defaults
-| Environment Variable | Description
| Required | Default
-| ------------------------------------- |
------------------------------------------------------------------------------------
| -------- | -------
-| METRON_LOG_DIR | Directory where the log file is
written | Optional |
/var/log/metron/
-| METRON_PID_FILE | File where the pid is written
| Optional | /var/run/metron/
-| METRON_REST_PORT | REST application port
| Optional | 8082
-| METRON_JDBC_CLIENT_PATH | Path to JDBC client jar
| Optional | H2 is bundled
-| METRON_TEMP_GROK_PATH | Temporary directory used to test
grok statements | Optional | ./patterns/temp
-| METRON_DEFAULT_GROK_PATH | Defaults HDFS directory used to
store grok statements | Optional |
/apps/metron/patterns
-| SECURITY_ENABLED | Enables Kerberos support
| Optional | false
-| METRON_USER_ROLE | Name of the role at the
authentication provider that provides user access to Metron. | Optional | USER
-| METRON_ADMIN_ROLE | Name of the role at the
authentication provider that provides administrative access to Metron.|
Optional | ADMIN
+| Environment Variable | Description
| Required | Default
+| ------------------------------------- |
-------------------------------------------------------------------------------------------------------------------------------------
| -------- | -------
+| METRON_LOG_DIR | Directory where the log file is
written
| Optional | /var/log/metron/
+| METRON_PID_FILE | File where the pid is written
| Optional | /var/run/metron/
+| METRON_REST_PORT | REST application port
| Optional | 8082
+| METRON_JDBC_CLIENT_PATH | Path to JDBC client jar
| Optional | H2 is bundled
+| METRON_TEMP_GROK_PATH | Temporary directory used to test
grok statements
| Optional | ./patterns/temp
+| METRON_DEFAULT_GROK_PATH | Defaults HDFS directory used to
store grok statements
| Optional | /apps/metron/patterns
+| SECURITY_ENABLED | Enables Kerberos support
| Optional | false
+| METRON_USER_ROLE | Name of the role at the
authentication provider that provides user access to Metron.
| Optional | USER
+| METRON_ADMIN_ROLE | Name of the role at the
authentication provider that provides administrative access to Metron.
| Optional | ADMIN
+| STORM_STATUS_CACHE_MAX_SIZE | The maximum size for the cache that
fronts calls to the Storm API for topology status.
| Optional | 10000
+| STORM_STATUS_CACHE_TIMEOUT_SECONDS | Duration in seconds for cache
entries to timeout. Note that the higher the value, the more stale the returned
value will be. | Optional | 5
### Optional - Blank Defaults
| Environment Variable | Description
| Required
diff --git a/metron-interface/metron-rest/src/main/config/rest_application.yml
b/metron-interface/metron-rest/src/main/config/rest_application.yml
index b6709a9..48cc5b4 100644
--- a/metron-interface/metron-rest/src/main/config/rest_application.yml
+++ b/metron-interface/metron-rest/src/main/config/rest_application.yml
@@ -40,6 +40,10 @@ storm:
indexing:
randomaccess.script.path:
${METRON_HOME}/bin/start_elasticsearch_topology.sh
batch.script.path: ${METRON_HOME}/bin/start_hdfs_topology.sh
+ status:
+ cache:
+ max.size: ${STORM_STATUS_CACHE_MAX_SIZE}
+ timeout.seconds: ${STORM_STATUS_CACHE_TIMEOUT_SECONDS}
kerberos:
enabled: ${SECURITY_ENABLED}
diff --git
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
index b8a8306..54f721c 100644
---
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
+++
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/MetronRestConstants.java
@@ -33,6 +33,8 @@ public class MetronRestConstants {
public static final String GROK_PATH_KEY = "grokPath";
public static final String STORM_UI_SPRING_PROPERTY = "storm.ui.url";
+ public static final String STORM_STATUS_CACHE_MAX_SIZE =
"storm.status.cache.max.size";
+ public static final String STORM_STATUS_CACHE_TIMEOUT_SECONDS =
"storm.status.cache.timeout.seconds";
public static final String SUPERVISOR_SUMMARY_URL =
"/api/v1/supervisor/summary";
public static final String TOPOLOGY_SUMMARY_URL = "/api/v1/topology/summary";
public static final String TOPOLOGY_URL = "/api/v1/topology";
diff --git
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
index 7a61cbc..7d31ce1 100644
---
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
+++
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/config/StormConfig.java
@@ -17,19 +17,22 @@
*/
package org.apache.metron.rest.config;
+import static org.apache.metron.rest.MetronRestConstants.DOCKER_PROFILE;
+import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
+
+import java.util.Arrays;
+import org.apache.metron.rest.MetronRestConstants;
+import org.apache.metron.rest.service.StormStatusService;
+import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl;
import org.apache.metron.rest.service.impl.DockerStormCLIWrapper;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.core.env.Environment;
-import java.util.Arrays;
-
-import static org.apache.metron.rest.MetronRestConstants.DOCKER_PROFILE;
-import static org.apache.metron.rest.MetronRestConstants.TEST_PROFILE;
-
@Configuration
@Profile("!" + TEST_PROFILE)
public class StormConfig {
@@ -45,4 +48,12 @@ public class StormConfig {
return new StormCLIWrapper();
}
}
+
+ @Bean
+ public StormStatusService stormStatusService(
+ @Autowired @Qualifier("StormStatusServiceImpl") StormStatusService
wrappedService) {
+ long maxCacheSize =
environment.getProperty(MetronRestConstants.STORM_STATUS_CACHE_MAX_SIZE,
Long.class, 10000L);
+ long maxCacheTimeoutSeconds =
environment.getProperty(MetronRestConstants.STORM_STATUS_CACHE_TIMEOUT_SECONDS,
Long.class, 5L);
+ return new CachedStormStatusServiceImpl(wrappedService, maxCacheSize,
maxCacheTimeoutSeconds);
+ }
}
diff --git
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java
new file mode 100644
index 0000000..788d8f5
--- /dev/null
+++
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImpl.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.rest.service.impl;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import java.lang.invoke.MethodHandles;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.metron.rest.model.SupervisorSummary;
+import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
+import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.StormStatusService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Decorator around the Storm status service that caches results.
+ */
+public class CachedStormStatusServiceImpl implements StormStatusService {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private enum CacheKey {
+ SUPERVISOR_SUMMARY,
+ TOPOLOGY_SUMMARY,
+ TOPOLOGY_STATUS,
+ ALL_TOPOLOGY_STATUS
+ }
+
+ private StormStatusService stormService;
+ // cache is thread-safe
+ // key will be a base CacheKey or the base + a suffix. See for example
getTopologyStatus(String name).
+ private Cache<Object, Object> statusCache;
+
+ /**
+ *
+ * @param stormService service to decorate and delegate calls to.
+ * @param maxCacheSize max number of records in the backing cache.
+ * @param cacheExpirationSeconds number of seconds before the cache will
expire an entry.
+ */
+ public CachedStormStatusServiceImpl(StormStatusService stormService, long
maxCacheSize,
+ long cacheExpirationSeconds) {
+ this.stormService = stormService;
+ LOG.info("Creating Storm service cache with max size '{}', record
expiration seconds '{}'",
+ maxCacheSize, cacheExpirationSeconds);
+ Caffeine builder = Caffeine.newBuilder().maximumSize(maxCacheSize)
+ .expireAfterWrite(cacheExpirationSeconds, TimeUnit.SECONDS);
+ statusCache = builder.build();
+ }
+
+ @Override
+ public SupervisorSummary getSupervisorSummary() {
+ return (SupervisorSummary) statusCache
+ .get(CacheKey.SUPERVISOR_SUMMARY, cacheKey -> {
+ LOG.debug("Loading new supervisor summary");
+ return stormService.getSupervisorSummary();
+ });
+ }
+
+ @Override
+ public TopologySummary getTopologySummary() {
+ return (TopologySummary) statusCache
+ .get(CacheKey.TOPOLOGY_SUMMARY, cacheKey -> {
+ LOG.debug("Loading new topology summary");
+ return stormService.getTopologySummary();
+ });
+ }
+
+ /**
+ * Rather than worry about coalescing individual topology statuses with the
call to get all topology statuses,
+ * we handle them independently.
+ * @param name topology name.
+ * @return status for this topolopgy.
+ */
+ @Override
+ public TopologyStatus getTopologyStatus(String name) {
+ return (TopologyStatus) statusCache
+ .get(CacheKey.TOPOLOGY_STATUS + name, cacheKey -> {
+ LOG.debug("Loading new topology status for '{}'", name);
+ return stormService.getTopologyStatus(name);
+ });
+ }
+
+ @Override
+ public List<TopologyStatus> getAllTopologyStatus() {
+ return (List<TopologyStatus>) statusCache
+ .get(CacheKey.ALL_TOPOLOGY_STATUS, cacheKey -> {
+ LOG.debug("Loading all topology status");
+ return stormService.getAllTopologyStatus();
+ });
+ }
+
+ @Override
+ public TopologyResponse activateTopology(String name) {
+ return stormService.activateTopology(name);
+ }
+
+ @Override
+ public TopologyResponse deactivateTopology(String name) {
+ return stormService.deactivateTopology(name);
+ }
+
+ /**
+ * Resets the cache, i.e. empties it.
+ */
+ public void reset() {
+ statusCache.invalidateAll();
+ }
+}
diff --git
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
index 25df549..081bd4e 100644
---
a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
+++
b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/StormStatusServiceImpl.java
@@ -15,16 +15,19 @@
package org.apache.metron.rest.service.impl;
+import static
org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY;
+import static
org.apache.metron.rest.MetronRestConstants.SUPERVISOR_SUMMARY_URL;
+import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL;
+import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-
import org.apache.metron.common.configuration.SensorParserGroup;
import org.apache.metron.parsers.topology.ParserTopologyCLI;
-import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.SupervisorSummary;
import org.apache.metron.rest.model.TopologyResponse;
import org.apache.metron.rest.model.TopologyStatus;
@@ -33,15 +36,12 @@ import org.apache.metron.rest.model.TopologySummary;
import org.apache.metron.rest.service.SensorParserGroupService;
import org.apache.metron.rest.service.StormStatusService;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
-import static
org.apache.metron.rest.MetronRestConstants.STORM_UI_SPRING_PROPERTY;
-import static
org.apache.metron.rest.MetronRestConstants.SUPERVISOR_SUMMARY_URL;
-import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_SUMMARY_URL;
-import static org.apache.metron.rest.MetronRestConstants.TOPOLOGY_URL;
-
+@Qualifier("StormStatusServiceImpl")
@Service
public class StormStatusServiceImpl implements StormStatusService {
diff --git
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
index 920c7ab..d42f128 100644
---
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
+++
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/config/TestConfig.java
@@ -60,9 +60,13 @@ import org.apache.metron.rest.mock.MockPcapJobSupplier;
import org.apache.metron.rest.mock.MockPcapToPdmlScriptWrapper;
import org.apache.metron.rest.mock.MockStormCLIClientWrapper;
import org.apache.metron.rest.mock.MockStormRestTemplate;
+import org.apache.metron.rest.service.StormStatusService;
+import org.apache.metron.rest.service.impl.CachedStormStatusServiceImpl;
import org.apache.metron.rest.service.impl.PcapToPdmlScriptWrapper;
import org.apache.metron.rest.service.impl.StormCLIWrapper;
import org.apache.metron.rest.user.UserSettingsClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
@@ -237,4 +241,12 @@ public class TestConfig {
public PcapToPdmlScriptWrapper pcapToPdmlScriptWrapper() {
return new MockPcapToPdmlScriptWrapper();
}
+
+ @Bean
+ public StormStatusService stormStatusService(
+ @Autowired @Qualifier("StormStatusServiceImpl") StormStatusService
wrappedService) {
+ long maxCacheSize = 0L;
+ long maxCacheTimeoutSeconds = 0L;
+ return new CachedStormStatusServiceImpl(wrappedService, maxCacheSize,
maxCacheTimeoutSeconds);
+ }
}
diff --git
a/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java
new file mode 100644
index 0000000..993465f
--- /dev/null
+++
b/metron-interface/metron-rest/src/test/java/org/apache/metron/rest/service/impl/CachedStormStatusServiceImplTest.java
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.rest.service.impl;
+
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import org.apache.metron.rest.model.SupervisorStatus;
+import org.apache.metron.rest.model.SupervisorSummary;
+import org.apache.metron.rest.model.TopologyResponse;
+import org.apache.metron.rest.model.TopologyStatus;
+import org.apache.metron.rest.model.TopologySummary;
+import org.apache.metron.rest.service.StormStatusService;
+import org.hamcrest.CoreMatchers;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+public class CachedStormStatusServiceImplTest {
+
+ @Mock
+ private StormStatusService stormService;
+ private CachedStormStatusServiceImpl cachedStormStatusService;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ cachedStormStatusService = new CachedStormStatusServiceImpl(stormService,
150, 30);
+ }
+
+ @Test
+ public void caches_supervisor_summary() {
+ SupervisorStatus supervisorStatus1 = new SupervisorStatus();
+ SupervisorStatus supervisorStatus2 = new SupervisorStatus();
+ SupervisorSummary supervisorSummary = new SupervisorSummary(
+ new SupervisorStatus[]{supervisorStatus1, supervisorStatus2});
+ when(stormService.getSupervisorSummary()).thenReturn(supervisorSummary);
+ // should hit the cache
+ for (int i = 0; i < 100; i++) {
+ cachedStormStatusService.getSupervisorSummary();
+ }
+ SupervisorSummary summary =
cachedStormStatusService.getSupervisorSummary();
+ assertThat("Number of supervisors did not match.",
summary.getSupervisors().length,
+ CoreMatchers.equalTo(2));
+ verify(stormService, times(1)).getSupervisorSummary();
+ cachedStormStatusService.reset();
+ summary = cachedStormStatusService.getSupervisorSummary();
+ assertThat("Number of supervisors did not match.",
summary.getSupervisors().length,
+ CoreMatchers.equalTo(2));
+ verify(stormService, times(2)).getSupervisorSummary();
+ }
+
+ @Test
+ public void caches_topology_summary() {
+ TopologyStatus topologyStatus1 = new TopologyStatus();
+ TopologyStatus topologyStatus2 = new TopologyStatus();
+ TopologySummary topologySummary = new TopologySummary(
+ new TopologyStatus[]{topologyStatus1, topologyStatus2});
+ when(stormService.getTopologySummary()).thenReturn(topologySummary);
+ // should hit the cache
+ for (int i = 0; i < 100; i++) {
+ cachedStormStatusService.getTopologySummary();
+ }
+ TopologySummary summary = cachedStormStatusService.getTopologySummary();
+ assertThat("Number of topologies did not match.",
summary.getTopologies().length,
+ CoreMatchers.equalTo(2));
+ verify(stormService, times(1)).getTopologySummary();
+ cachedStormStatusService.reset();
+ summary = cachedStormStatusService.getTopologySummary();
+ assertThat("Number of topologies did not match.",
summary.getTopologies().length,
+ CoreMatchers.equalTo(2));
+ verify(stormService, times(2)).getTopologySummary();
+ }
+
+ @Test
+ public void caches_topology_status_by_name() {
+ String topologyName1 = "topology-1";
+ String topologyName2 = "topology-2";
+ TopologyStatus topologyStatus1 = new TopologyStatus();
+ topologyStatus1.setName(topologyName1);
+ TopologyStatus topologyStatus2 = new TopologyStatus();
+ topologyStatus2.setName(topologyName2);
+
when(stormService.getTopologyStatus(topologyName1)).thenReturn(topologyStatus1);
+
when(stormService.getTopologyStatus(topologyName2)).thenReturn(topologyStatus2);
+ // should hit the cache
+ for (int i = 0; i < 100; i++) {
+ cachedStormStatusService.getTopologyStatus(topologyName1);
+ cachedStormStatusService.getTopologyStatus(topologyName2);
+ }
+ TopologyStatus status1 =
cachedStormStatusService.getTopologyStatus(topologyName1);
+ TopologyStatus status2 =
cachedStormStatusService.getTopologyStatus(topologyName2);
+ assertThat("Name did not match for topology 1.", status1.getName(),
+ CoreMatchers.equalTo(topologyName1));
+ assertThat("Name did not match for topology 2.", status2.getName(),
+ CoreMatchers.equalTo(topologyName2));
+ verify(stormService, times(1)).getTopologyStatus(topologyName1);
+ verify(stormService, times(1)).getTopologyStatus(topologyName2);
+ cachedStormStatusService.reset();
+ cachedStormStatusService.getTopologyStatus(topologyName1);
+ cachedStormStatusService.getTopologyStatus(topologyName2);
+ verify(stormService, times(2)).getTopologyStatus(topologyName1);
+ verify(stormService, times(2)).getTopologyStatus(topologyName2);
+ }
+
+ @Test
+ public void caches_all_topology_status() {
+ TopologyStatus topologyStatus1 = new TopologyStatus();
+ TopologyStatus topologyStatus2 = new TopologyStatus();
+ List<TopologyStatus> allTopologyStatus = ImmutableList.of(topologyStatus1,
topologyStatus2);
+ when(stormService.getAllTopologyStatus()).thenReturn(allTopologyStatus);
+ // should hit the cache
+ for (int i = 0; i < 100; i++) {
+ cachedStormStatusService.getAllTopologyStatus();
+ }
+ List<TopologyStatus> allStatus =
cachedStormStatusService.getAllTopologyStatus();
+ assertThat("Number of topologies returned by all topology status check did
not match.",
+ allStatus.size(), CoreMatchers.equalTo(2));
+ verify(stormService, times(1)).getAllTopologyStatus();
+ cachedStormStatusService.reset();
+ cachedStormStatusService.getAllTopologyStatus();
+ verify(stormService, times(2)).getAllTopologyStatus();
+ }
+
+ @Test
+ public void admin_functions_act_as_simple_passthroughs_to_storm_service() {
+ TopologyResponse topologyResponse = new TopologyResponse();
+
when(stormService.activateTopology(anyString())).thenReturn(topologyResponse);
+
when(stormService.deactivateTopology(anyString())).thenReturn(topologyResponse);
+ for (int i = 0; i < 100; i++) {
+ cachedStormStatusService.activateTopology("foo");
+ cachedStormStatusService.deactivateTopology("foo");
+ }
+ verify(stormService, times(100)).activateTopology(anyString());
+ verify(stormService, times(100)).deactivateTopology(anyString());
+ }
+}