Repository: incubator-atlas Updated Branches: refs/heads/master bf377abbb -> 92d028178
ATLAS-1436: Metrics caching and UTs (Part 2) Signed-off-by: Madhan Neethiraj <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/92d02817 Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/92d02817 Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/92d02817 Branch: refs/heads/master Commit: 92d02817846d9431042923182f7feeb1a3a2a7d2 Parents: bf377ab Author: apoorvnaik <[email protected]> Authored: Wed Jan 25 15:35:28 2017 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Thu Jan 26 02:20:47 2017 -0800 ---------------------------------------------------------------------- distro/src/conf/atlas-application.properties | 1 + .../atlas/model/metrics/AtlasMetrics.java | 2 +- .../apache/atlas/services/MetricsService.java | 161 +++++++++++++------ .../atlas/services/MetricsServiceTest.java | 112 +++++++++++++ 4 files changed, 229 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/distro/src/conf/atlas-application.properties ---------------------------------------------------------------------- diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties index 7f79ad7..303ce7b 100755 --- a/distro/src/conf/atlas-application.properties +++ b/distro/src/conf/atlas-application.properties @@ -207,6 +207,7 @@ atlas.feature.taxonomy.enable=true ############ Atlas Metric/Stats configs ################ # Format: atlas.metric.query.<key>.<name> +atlas.metric.query.cache.ttlInSecs=900 #atlas.metric.query.general.typeCount= #atlas.metric.query.general.typeUnusedCount= #atlas.metric.query.general.entityCount= http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java ---------------------------------------------------------------------- diff --git a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java index 602cdb4..edf3cc5 100644 --- a/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java +++ b/intg/src/main/java/org/apache/atlas/model/metrics/AtlasMetrics.java @@ -61,7 +61,7 @@ public class AtlasMetrics { } @JsonIgnore - public void addData(String groupKey, String key, Integer value) { + public void addData(String groupKey, String key, Number value) { Map<String, Map<String, Number>> data = this.data; if (data == null) { data = new HashMap<>(); http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/main/java/org/apache/atlas/services/MetricsService.java ---------------------------------------------------------------------- diff --git a/repository/src/main/java/org/apache/atlas/services/MetricsService.java b/repository/src/main/java/org/apache/atlas/services/MetricsService.java index 855e402..e2cc369 100644 --- a/repository/src/main/java/org/apache/atlas/services/MetricsService.java +++ b/repository/src/main/java/org/apache/atlas/services/MetricsService.java @@ -17,18 +17,21 @@ */ package org.apache.atlas.services; +import com.google.common.annotations.VisibleForTesting; import com.google.inject.Singleton; import org.apache.atlas.ApplicationProperties; import org.apache.atlas.AtlasException; import org.apache.atlas.model.metrics.AtlasMetrics; import org.apache.atlas.repository.graph.AtlasGraphProvider; import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; import org.apache.commons.configuration.Configuration; -import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import javax.script.ScriptException; +import java.util.Collection; import java.util.List; import java.util.Map; @@ -36,54 +39,128 @@ import java.util.Map; public class MetricsService { private static final Logger LOG = LoggerFactory.getLogger(MetricsService.class); - public static final String METRIC_QUERY_PREFIX = "atlas.metric.query."; + public static final String METRIC_QUERY_PREFIX = "atlas.metric.query."; + public static final String METRIC_QUERY_CACHE_TTL = "atlas.metric.query.cache.ttlInSecs"; + public static final int DEFAULT_CACHE_TTL_IN_SECS = 900; - public static final String TYPE = "type"; - public static final String ENTITY = "entity"; - public static final String TAG = "tag"; + public static final String TYPE = "type"; + public static final String ENTITY = "entity"; + public static final String TAG = "tag"; public static final String GENERAL = "general"; - public static final String METRIC_TYPE_COUNT = TYPE + "Count"; + public static final String METRIC_TYPE_COUNT = TYPE + "Count"; public static final String METRIC_TYPE_UNUSED_COUNT = TYPE + "UnusedCount"; - public static final String METRIC_TYPE_ENTITIES = TYPE + "Entities"; + public static final String METRIC_TYPE_ENTITIES = TYPE + "Entities"; - public static final String METRIC_ENTITY_COUNT = ENTITY + "Count"; - public static final String METRIC_ENTITY_DELETED = ENTITY + "Deleted"; + public static final String METRIC_ENTITY_COUNT = ENTITY + "Count"; + public static final String METRIC_ENTITY_DELETED = ENTITY + "Deleted"; public static final String METRIC_TAGGED_ENTITIES = ENTITY + "Tagged"; public static final String METRIC_TAGS_PER_ENTITY = ENTITY + "Tags"; - public static final String METRIC_TAG_COUNT = TAG + "Count"; + public static final String METRIC_TAG_COUNT = TAG + "Count"; public static final String METRIC_ENTITIES_PER_TAG = TAG + "Entities"; - private static AtlasGraph atlasGraph; - private static Configuration configuration; + public static final String METRIC_COLLECTION_TIME = "collectionTime"; - public MetricsService() throws AtlasException { - atlasGraph = AtlasGraphProvider.getGraphInstance(); - configuration = ApplicationProperties.get(); + private static Configuration configuration = null; + private final AtlasGraph atlasGraph; + private final AtlasTypeRegistry atlasTypeRegistry; + private final int cacheTTLInSecs; + + private AtlasMetrics cachedMetrics = null; + private long cacheExpirationTime = 0; + + + @Inject + public MetricsService(AtlasTypeRegistry typeRegistry) throws AtlasException { + this(ApplicationProperties.get(), AtlasGraphProvider.getGraphInstance(), typeRegistry); + } + + @VisibleForTesting + MetricsService(Configuration configuration, AtlasGraph graph, AtlasTypeRegistry typeRegistry) { + MetricsService.configuration = configuration; + + atlasTypeRegistry = typeRegistry; + atlasGraph = graph; + cacheTTLInSecs = configuration != null ? configuration.getInt(METRIC_QUERY_CACHE_TTL, DEFAULT_CACHE_TTL_IN_SECS) + : DEFAULT_CACHE_TTL_IN_SECS; } @SuppressWarnings("unchecked") public AtlasMetrics getMetrics() { - AtlasMetrics metrics = new AtlasMetrics(); - for (MetricQuery metricQuery : MetricQuery.values()) { - try { - Object result = atlasGraph.executeGremlinScript(metricQuery.query, false); - if (result instanceof Number) { - metrics.addData(metricQuery.type, metricQuery.name, ((Number) result).intValue()); - } else if (result instanceof List) { - for (Map resultMap : (List<Map>) result) { - metrics.addData(metricQuery.type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue()); + if (!isCacheValid()) { + AtlasMetrics metrics = new AtlasMetrics(); + + for (MetricQuery metricQuery : MetricQuery.values()) { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing query: {}", metricQuery); } - } else { - LOG.warn("Unhandled return type {} for {}. Ignoring", result.getClass().getSimpleName(), metricQuery); + + if (metricQuery == MetricQuery.ENTITIES_PER_TYPE) { + Collection<String> entityDefNames = atlasTypeRegistry.getAllEntityDefNames(); + + for (String entityDefName : entityDefNames) { + String formattedQuery = String.format(metricQuery.query, entityDefName); + + executeGremlinQuery(metrics, metricQuery.type, entityDefName, formattedQuery); + } + } else { + executeGremlinQuery(metrics, metricQuery.type, metricQuery.name, metricQuery.query); + } + } catch (ScriptException e) { + LOG.error("Gremlin execution failed for metric {}", metricQuery, e); } - } catch (ScriptException e) { - LOG.error("Gremlin execution failed for metric {}", metricQuery, e); } + + long collectionTime = System.currentTimeMillis(); + + metrics.addData(GENERAL, METRIC_COLLECTION_TIME, collectionTime); + + this.cachedMetrics = metrics; + this.cacheExpirationTime = (collectionTime + cacheTTLInSecs * 1000); } - return metrics; + return cachedMetrics; + } + + private void executeGremlinQuery(AtlasMetrics metrics, String type, String name, String query) throws ScriptException { + Object result = atlasGraph.executeGremlinScript(query, false); + + if (result instanceof Number) { + metrics.addData(type, name, ((Number) result).intValue()); + } else if (result instanceof List) { + for (Map resultMap : (List<Map>) result) { + metrics.addData(type, (String) resultMap.get("key"), ((Number) resultMap.get("value")).intValue()); + } + } else { + String returnClassName = result != null ? result.getClass().getSimpleName() : "null"; + + LOG.warn("Unhandled return type {} for {}. Ignoring", returnClassName, query); + } + } + + private boolean isCacheValid() { + boolean valid = cachedMetrics != null && System.currentTimeMillis() < cacheExpirationTime; + + if (LOG.isDebugEnabled()) { + LOG.debug("cachedMetrics: {}", cachedMetrics != null); + LOG.debug("cacheExpirationTime: {}", cacheExpirationTime); + LOG.debug("valid: {}", valid); + } + + return valid; + } + + private static String getQuery(String type, String name, String defaultQuery) { + String ret = configuration != null ? configuration.getString(METRIC_QUERY_PREFIX + type + "." + name, defaultQuery) + : defaultQuery; + + if (LOG.isDebugEnabled()) { + LOG.debug("query for {}.{}: {}", type, name, ret); + } + + return ret; } /** @@ -92,35 +169,27 @@ public class MetricsService { * The default behavior is to read from the properties and override the statically type query if the configured * query is not blank/empty. */ - enum MetricQuery { + private enum MetricQuery { TYPE_COUNT(GENERAL, METRIC_TYPE_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() != 'TRAIT'}).count()"), UNUSED_TYPE_COUNT(GENERAL, METRIC_TYPE_UNUSED_COUNT, "g.V('__type', 'typeSystem').filter({ it.'__type.category'.name() != 'TRAIT' && it.inE.count() == 0}).count()"), ENTITY_COUNT(GENERAL, METRIC_ENTITY_COUNT, "g.V().has('__superTypeNames', T.in, ['Referenceable']).count()"), TAGS_COUNT(GENERAL, METRIC_TAG_COUNT, "g.V().has('__type', 'typeSystem').filter({it.'__type.category'.name() == 'TRAIT'}).count()"), DELETED_ENTITY_COUNT(GENERAL, METRIC_ENTITY_DELETED, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__status', 'DELETED').count()"), - ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__type', 'typeSystem').has('__type.name').filter({it.'__type.category'.name() != 'TRAIT'}).transform{[key: it.'__type.name', value: it.inE.count()]}.dedup().toList()"), + ENTITIES_PER_TYPE(ENTITY, METRIC_TYPE_ENTITIES, "g.V().has('__typeName', T.in, ['%s']).count()"), TAGGED_ENTITIES(ENTITY, METRIC_TAGGED_ENTITIES, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').count()"), TAGS_PER_ENTITY(TAG, METRIC_TAGS_PER_ENTITY, "g.V().has('__superTypeNames', T.in, ['Referenceable']).has('__traitNames').transform{[ key: it.'Referenceable.qualifiedName', value: it.'__traitNames'.size()]}.dedup().toList()"), ; - private String type; - private String name; - private String query; - - private static String getQuery(String type, String name) { - String metricQueryKey = METRIC_QUERY_PREFIX + type + "." + name; - if (LOG.isDebugEnabled()) { - LOG.debug("Looking for configured query {}", metricQueryKey); - } - return configuration.getString(metricQueryKey, ""); - } + + private final String type; + private final String name; + private final String query; MetricQuery(String type, String name, String query) { - this.type = type; - this.name = name; - String configuredQuery = getQuery(type, name); - this.query = StringUtils.isNotEmpty(configuredQuery) ? configuredQuery : query; + this.type = type; + this.name = name; + this.query = MetricsService.getQuery(type, name, query); } @Override http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/92d02817/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java ---------------------------------------------------------------------- diff --git a/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java new file mode 100644 index 0000000..5d2e460 --- /dev/null +++ b/repository/src/test/java/org/apache/atlas/services/MetricsServiceTest.java @@ -0,0 +1,112 @@ +package org.apache.atlas.services; + +import org.apache.atlas.model.metrics.AtlasMetrics; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.commons.configuration.Configuration; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import javax.script.ScriptException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.*; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +public class MetricsServiceTest { + private Configuration mockConfig = mock(Configuration.class); + private AtlasTypeRegistry mockTypeRegistry = mock(AtlasTypeRegistry.class); + private AtlasGraph mockGraph = mock(AtlasGraph.class); + private MetricsService metricsService; + + private List<Map> mockMapList = new ArrayList<>(); + private Number mockCount = 10; + + @BeforeClass + public void init() throws ScriptException { + Map<String, Object> aMockMap = new HashMap<>(); + Map<String, Object> bMockMap = new HashMap<>(); + Map<String, Object> cMockMap = new HashMap<>(); + aMockMap.put("key", "a"); + aMockMap.put("value", 1); + + bMockMap.put("key", "b"); + bMockMap.put("value", 2); + + cMockMap.put("key", "c"); + cMockMap.put("value", 3); + mockMapList.add(aMockMap); + mockMapList.add(bMockMap); + mockMapList.add(cMockMap); + + when(mockConfig.getInt(anyString(), anyInt())).thenReturn(5); + assertEquals(mockConfig.getInt("test", 1), 5); + when(mockTypeRegistry.getAllEntityDefNames()).thenReturn(Arrays.asList("a", "b", "c")); + setupMockGraph(); + metricsService = new MetricsService(mockConfig, mockGraph, mockTypeRegistry); + } + + private void setupMockGraph() throws ScriptException { + if (mockGraph == null) mockGraph = mock(AtlasGraph.class); + when(mockGraph.executeGremlinScript(anyString(), eq(false))).thenAnswer(new Answer<Object>() { + @Override + public Object answer(InvocationOnMock invocationOnMock) throws Throwable { + if (((String)invocationOnMock.getArguments()[0]).contains("count()")) { + return mockCount; + } else { + return mockMapList; + } + } + }); + } + + @Test + public void testGetMetrics() throws InterruptedException, ScriptException { + assertNotNull(metricsService); + AtlasMetrics metrics = metricsService.getMetrics(); + assertNotNull(metrics); + Number aCount = metrics.getMetric("entity", "a"); + assertNotNull(aCount); + assertEquals(aCount, 10); + + Number bCount = metrics.getMetric("entity", "b"); + assertNotNull(bCount); + assertEquals(bCount, 10); + + Number cCount = metrics.getMetric("entity", "c"); + assertNotNull(cCount); + assertEquals(cCount, 10); + + Number aTags = metrics.getMetric("tag", "a"); + assertNotNull(aTags); + assertEquals(aTags, 1); + + Number bTags = metrics.getMetric("tag", "b"); + assertNotNull(bTags); + assertEquals(bTags, 2); + + Number cTags = metrics.getMetric("tag", "c"); + assertNotNull(cTags); + assertEquals(cTags, 3); + + verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean()); + + // Subsequent call within the cache timeout window + metricsService.getMetrics(); + verifyZeroInteractions(mockGraph); + + // Now test the cache refresh + Thread.sleep(6000); + metricsService.getMetrics(); + verify(mockGraph, atLeastOnce()).executeGremlinScript(anyString(), anyBoolean()); + } +} \ No newline at end of file
