Repository: metron Updated Branches: refs/heads/master 5303c5ead -> c14c78692
METRON-1544 Flaky test: org.apache.metron.stellar.common.CachingStellarProcessorTest#testCaching (nickwallen) closes apache/metron#1015 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/c14c7869 Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/c14c7869 Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/c14c7869 Branch: refs/heads/master Commit: c14c7869228132ad6e6cae8b9914f669ac6b489d Parents: 5303c5e Author: nickwallen <[email protected]> Authored: Tue May 29 14:25:55 2018 -0400 Committer: nickallen <[email protected]> Committed: Tue May 29 14:25:55 2018 -0400 ---------------------------------------------------------------------- .../stellar/common/CachingStellarProcessor.java | 141 +++++++++++--- .../common/CachingStellarProcessorTest.java | 195 +++++++++++++------ 2 files changed, 252 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/c14c7869/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java index 36e6579..19de14e 100644 --- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java +++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/CachingStellarProcessor.java @@ -19,11 +19,16 @@ package org.apache.metron.stellar.common; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.metron.stellar.common.utils.ConversionUtils; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.VariableResolver; import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -35,12 +40,38 @@ import java.util.concurrent.TimeUnit; * LFU cache. */ public class CachingStellarProcessor extends StellarProcessor { + + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static ThreadLocal<Map<String, Set<String>> > variableCache = ThreadLocal.withInitial(() -> new HashMap<>()); + + /** + * A property that defines the maximum cache size. + */ public static String MAX_CACHE_SIZE_PARAM = "stellar.cache.maxSize"; + + /** + * A property that defines the max time in minutes that elements are retained in the cache. + */ public static String MAX_TIME_RETAIN_PARAM = "stellar.cache.maxTimeRetain"; + /** + * A property that defines if cache usage stats should be recorded. + */ + public static String RECORD_STATS = "stellar.cache.record.stats"; + + /** + * The cache key is based on the expression and input values. + */ public static class Key { + + /** + * The expression to execute. + */ private String expression; + + /** + * The variables that serve as input to the expression. + */ private Map<String, Object> input; public Key(String expression, Map<String, Object> input) { @@ -58,59 +89,98 @@ public class CachingStellarProcessor extends StellarProcessor { @Override public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Key key = (Key) o; + if (this == o) { + return true; + } - if (getExpression() != null ? !getExpression().equals(key.getExpression()) : key.getExpression() != null) + if (o == null || getClass() != o.getClass()) { return false; - return getInput() != null ? getInput().equals(key.getInput()) : key.getInput() == null; + } + Key key = (Key) o; + return new EqualsBuilder() + .append(expression, key.expression) + .append(input, key.input) + .isEquals(); } @Override public int hashCode() { - int result = getExpression() != null ? getExpression().hashCode() : 0; - result = 31 * result + (getInput() != null ? getInput().hashCode() : 0); - return result; + return new HashCodeBuilder(17, 37) + .append(expression) + .append(input) + .toHashCode(); } - } + @Override + public String toString() { + return new ToStringBuilder(this) + .append("expression", expression) + .append("input", input) + .toString(); + } + } /** - * Parses and evaluates the given Stellar expression, {@code expression}. Results will be taken from a cache if possible. + * Parses and evaluates the given Stellar expression, {@code expression}. Results will be taken + * from a cache if possible. * - * @param expression The Stellar expression to parse and evaluate. - * @param variableResolver The {@link VariableResolver} to determine values of variables used in the Stellar expression, {@code expression}. - * @param functionResolver The {@link FunctionResolver} to determine values of functions used in the Stellar expression, {@code expression}. - * @param context The context used during validation. + * @param expression The Stellar expression to parse and evaluate. + * @param variableResolver The {@link VariableResolver} to determine values of variables used in + * the Stellar expression, {@code expression}. + * @param functionResolver The {@link FunctionResolver} to determine values of functions used in + * the Stellar expression, {@code expression}. + * @param context The context used during validation. * @return The value of the evaluated Stellar expression, {@code expression}. */ @Override - public Object parse(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { + public Object parse( + String expression, + VariableResolver variableResolver, + FunctionResolver functionResolver, + Context context) { + Optional<Object> cacheOpt = context.getCapability(Context.Capabilities.CACHE, false); if(cacheOpt.isPresent()) { + + // use the cache Cache<Key, Object> cache = (Cache<Key, Object>) cacheOpt.get(); Key k = toKey(expression, variableResolver); return cache.get(k, x -> parseUncached(x.expression, variableResolver, functionResolver, context)); - } - else { + + } else { + + LOG.debug("No cache present."); return parseUncached(expression, variableResolver, functionResolver, context); } } protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { + LOG.debug("Executing Stellar; expression={}", expression); return super.parse(expression, variableResolver, functionResolver, context); } - private Key toKey(String expression, VariableResolver resolver) { + /** + * Create a cache key using the expression and all variables used by that expression. + * + * @param expression The Stellar expression. + * @param resolver The variable resolver. + * @return A key with which to do a cache lookup. + */ + protected Key toKey(String expression, VariableResolver resolver) { + + // fetch only the variables used in the expression Set<String> variablesUsed = variableCache.get().computeIfAbsent(expression, this::variablesUsed); + + // resolve each of the variables used by the expression Map<String, Object> input = new HashMap<>(); for(String v : variablesUsed) { input.computeIfAbsent(v, resolver::resolve); } - return new Key(expression, input); + + Key cacheKey = new Key(expression, input); + LOG.debug("Created cache key; {}", cacheKey); + return cacheKey; } /** @@ -119,18 +189,39 @@ public class CachingStellarProcessor extends StellarProcessor { * @return A cache. */ public static Cache<Key, Object> createCache(Map<String, Object> config) { + + // the cache configuration is required if(config == null) { + LOG.debug("Cannot create cache; missing cache configuration"); return null; } + + // max cache size is required Long maxSize = getParam(config, MAX_CACHE_SIZE_PARAM, null, Long.class); + if(maxSize == null || maxSize <= 0) { + LOG.error("Cannot create cache; missing or invalid configuration; {} = {}", MAX_CACHE_SIZE_PARAM, maxSize); + return null; + } + + // max time retain is required Integer maxTimeRetain = getParam(config, MAX_TIME_RETAIN_PARAM, null, Integer.class); - if(maxSize == null || maxTimeRetain == null || maxSize <= 0 || maxTimeRetain <= 0) { + if(maxTimeRetain == null || maxTimeRetain <= 0) { + LOG.error("Cannot create cache; missing or invalid configuration; {} = {}", MAX_TIME_RETAIN_PARAM, maxTimeRetain); return null; } - return Caffeine.newBuilder() - .maximumSize(maxSize) - .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES) - .build(); + + Caffeine<Object, Object> cache = Caffeine + .newBuilder() + .maximumSize(maxSize) + .expireAfterWrite(maxTimeRetain, TimeUnit.MINUTES); + + // record stats is optional + Boolean recordStats = getParam(config, RECORD_STATS, false, Boolean.class); + if(recordStats) { + cache.recordStats(); + } + + return cache.build(); } private static <T> T getParam(Map<String, Object> config, String key, T defaultVal, Class<T> clazz) { http://git-wip-us.apache.org/repos/asf/metron/blob/c14c7869/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java ---------------------------------------------------------------------- diff --git a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java index 94421de..1690236 100644 --- a/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java +++ b/metron-stellar/stellar-common/src/test/java/org/apache/metron/stellar/common/CachingStellarProcessorTest.java @@ -22,83 +22,160 @@ import com.google.common.collect.ImmutableMap; import org.apache.metron.stellar.dsl.Context; import org.apache.metron.stellar.dsl.MapVariableResolver; import org.apache.metron.stellar.dsl.StellarFunctions; -import org.apache.metron.stellar.dsl.VariableResolver; -import org.apache.metron.stellar.dsl.functions.resolver.FunctionResolver; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; +import org.slf4j.LoggerFactory; +import java.lang.invoke.MethodHandles; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class CachingStellarProcessorTest { + private static final org.slf4j.Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static Map<String, Object> fields = new HashMap<String, Object>() {{ put("name", "blah"); }}; + private CachingStellarProcessor processor; + private Cache<CachingStellarProcessor.Key, Object> cache; + private Context contextWithCache; + + @Before + public void setup() throws Exception { + + // create the cache + Map<String, Object> cacheConfig = ImmutableMap.of( + CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 2, + CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10, + CachingStellarProcessor.RECORD_STATS, true + ); + cache = CachingStellarProcessor.createCache(cacheConfig); + contextWithCache = new Context.Builder() + .with(Context.Capabilities.CACHE, () -> cache) + .build(); + + // create the object to test + processor = new CachingStellarProcessor(); + } + + /** + * Running the same expression multiple times should hit the cache. + */ + @Test + public void testWithCache() { + + Object result = execute("TO_UPPER(name)", contextWithCache); + assertEquals("BLAH", result); + assertEquals(1, cache.stats().requestCount()); + assertEquals(1, cache.stats().missCount()); + assertEquals(0, cache.stats().hitCount()); + + result = execute("TO_UPPER(name)", contextWithCache); + assertEquals("BLAH", result); + assertEquals(2, cache.stats().requestCount()); + assertEquals(1, cache.stats().missCount()); + assertEquals(1, cache.stats().hitCount()); + + result = execute("TO_UPPER(name)", contextWithCache); + assertEquals("BLAH", result); + assertEquals(3, cache.stats().requestCount()); + assertEquals(1, cache.stats().missCount()); + assertEquals(2, cache.stats().hitCount()); + } + + /** + * The processor should work, even if no cache is present in the execution context. + */ + @Test + public void testNoCache() throws Exception { + + // the execution context does not contain a cache + Context contextNoCache = Context.EMPTY_CONTEXT(); + + assertEquals("BLAH", execute("TO_UPPER(name)", contextNoCache)); + assertEquals("BLAH", execute("TO_UPPER(name)", contextNoCache)); + } + + @Test + public void testInvalidMaxCacheSize() { + Map<String, Object> cacheConfig = ImmutableMap.of( + CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, -1, + CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10 + ); + cache = CachingStellarProcessor.createCache(cacheConfig); + assertNull(cache); + } + @Test - public void testNoCaching() throws Exception { - //no caching, so every expression is a cache miss. - Assert.assertEquals(2, countMisses(2, Context.EMPTY_CONTEXT(), "TO_UPPER(name)")); - //Ensure the correct result is returned. - Assert.assertEquals("BLAH", evaluateExpression(Context.EMPTY_CONTEXT(), "TO_UPPER(name)")); + public void testMissingMaxCacheSize() { + Map<String, Object> cacheConfig = ImmutableMap.of( + CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10 + ); + cache = CachingStellarProcessor.createCache(cacheConfig); + assertNull(cache); } @Test - public void testCaching() throws Exception { - Cache<CachingStellarProcessor.Key, Object> cache = CachingStellarProcessor.createCache( - ImmutableMap.of(CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 2 - ,CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, 10 - ) - ); - Context context = new Context.Builder() - .with( Context.Capabilities.CACHE , () -> cache ) - .build(); - //running the same expression twice should hit the cache on the 2nd time and only yield one miss - Assert.assertEquals(1, countMisses(2, context, "TO_UPPER(name)")); - - //Ensure the correct result is returned. - Assert.assertEquals("BLAH", evaluateExpression(context, "TO_UPPER(name)")); - - //running the same expression 20 more times should pull from the cache - Assert.assertEquals(0, countMisses(20, context, "TO_UPPER(name)")); - - //Now we are running 4 distinct operations with a cache size of 2. The cache has 1 element in it before we start: - // TO_LOWER(name) - miss (brand new), cache is full - // TO_UPPER(name) - hit, cache is full - // TO_UPPER('foo') - miss (brand new), cache is still full, but TO_LOWER is evicted as the least frequently used - // JOIN... - miss (brand new), cache is still full, but TO_UPPER('foo') is evicted as the least frequently used - //this pattern repeats a 2nd time to add another 3 cache misses, totalling 6. - Assert.assertEquals(6, countMisses(2, context, "TO_LOWER(name)", "TO_UPPER(name)", "TO_UPPER('foo')", "JOIN([name, 'blah'], ',')")); + public void testInvalidMaxTimeRetain() { + Map<String, Object> cacheConfig = ImmutableMap.of( + CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10, + CachingStellarProcessor.MAX_TIME_RETAIN_PARAM, -2 + ); + cache = CachingStellarProcessor.createCache(cacheConfig); + assertNull(cache); } - private Object evaluateExpression(Context context, String expression) { - StellarProcessor processor = new CachingStellarProcessor(); - return processor.parse(expression - , new MapVariableResolver(fields) - , StellarFunctions.FUNCTION_RESOLVER() - , context); + @Test + public void testMissingMaxTimeRetain() { + Map<String, Object> cacheConfig = ImmutableMap.of( + CachingStellarProcessor.MAX_CACHE_SIZE_PARAM, 10 + ); + cache = CachingStellarProcessor.createCache(cacheConfig); + assertNull(cache); } - private int countMisses(int numRepetition, Context context, String... expressions) { - AtomicInteger numExpressions = new AtomicInteger(0); - StellarProcessor processor = new CachingStellarProcessor() { - @Override - protected Object parseUncached(String expression, VariableResolver variableResolver, FunctionResolver functionResolver, Context context) { - numExpressions.incrementAndGet(); - return super.parseUncached(expression, variableResolver, functionResolver, context); - } - }; - - for(int i = 0;i < numRepetition;++i) { - for(String expression : expressions) { - processor.parse(expression - , new MapVariableResolver(fields) - , StellarFunctions.FUNCTION_RESOLVER() - , context); - } - } - return numExpressions.get(); + /** + * The cache should continue to hit, even if variables not used in the cached expression change. + */ + @Test + public void testUnrelatedVariableChange() { + + // expect miss + Object result = execute("TO_UPPER(name)", contextWithCache); + assertEquals("BLAH", result); + assertEquals(1, cache.stats().requestCount()); + assertEquals(1, cache.stats().missCount()); + assertEquals(0, cache.stats().hitCount()); + + // add an irrelevant variable that is not used in the expression + fields.put("unrelated_var_1", "true"); + fields.put("unrelated_var_2", 22); + + // still expect a hit + result = execute("TO_UPPER(name)", contextWithCache); + assertEquals("BLAH", result); + assertEquals(2, cache.stats().requestCount()); + assertEquals(1, cache.stats().missCount()); + assertEquals(1, cache.stats().hitCount()); + + } + + /** + * Execute each expression. + * @param expression The expression to execute. + */ + private Object execute(String expression, Context context) { + + Object result = processor.parse( + expression, + new MapVariableResolver(fields), + StellarFunctions.FUNCTION_RESOLVER(), + context); + return result; } }
