Repository: metron Updated Branches: refs/heads/master 05d309f91 -> 575ba03b9
METRON-1880 Use Caffeine for Profiler Caching (nickwallen) closes apache/metron#1270 Project: http://git-wip-us.apache.org/repos/asf/metron/repo Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/575ba03b Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/575ba03b Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/575ba03b Branch: refs/heads/master Commit: 575ba03b9d91d96bf5c2bd1dbee95681ae3131d0 Parents: 05d309f Author: nickwallen <[email protected]> Authored: Tue Nov 20 13:31:46 2018 -0500 Committer: nickallen <[email protected]> Committed: Tue Nov 20 13:31:46 2018 -0500 ---------------------------------------------------------------------- .../metron-profiler-common/README.md | 14 ++-- metron-analytics/metron-profiler-common/pom.xml | 5 ++ .../profiler/DefaultMessageDistributor.java | 84 ++++++++++++-------- .../profiler/DefaultMessageDistributorTest.java | 27 ++++--- 4 files changed, 79 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/README.md ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/README.md b/metron-analytics/metron-profiler-common/README.md index 8f26aaf..fe4c2ed 100644 --- a/metron-analytics/metron-profiler-common/README.md +++ b/metron-analytics/metron-profiler-common/README.md @@ -28,7 +28,7 @@ The Profiler is a feature extraction mechanism that can generate a profile descr This is achieved by summarizing the telemetry data consumed by Metron over tumbling windows. A summary statistic is applied to the data received within a given window. Collecting these values across many windows result in a time series that is useful for analysis. -Any field contained within a message can be used to generate a profile. A profile can even be produced by combining fields that originate in different data sources. A user has considerable power to transform the data used in a profile by leveraging the Stellar language. +Any field contained within a message can be used to generate a profile. A profile can even be produced by combining fields that originate in different data sources. A user has considerable power to transform the data used in a profile by leveraging the Stellar language. There are three separate ports of the Profiler that share this common code base. * The [Storm Profiler](../metron-profiler-storm/README.md) builds low-latency profiles over streaming data sets. @@ -58,12 +58,12 @@ Let's start with a simple example. The following profile maintains a count of th "profile": "hello-world", "foreach": "ip_src_addr", "init": { - "count": 0 + "count": "0" }, "update": { "count": "count + 1" }, - "result": "count", + "result": "count" } ] } @@ -321,7 +321,7 @@ It is important to note that the Profiler can persist any serializable Object, n ``` $ source /etc/default/metron $ bin/stellar -z $ZOOKEEPER - + [Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", PROFILE_FIXED(30, "MINUTES")) [org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, ...] ``` @@ -330,10 +330,10 @@ It is important to note that the Profiler can persist any serializable Object, n ``` [Stellar]>>> aStat := GET_FIRST(stats) org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9 - + [Stellar]>>> STATS_MEAN(aStat) 15979.0625 - + [Stellar]>>> STATS_PERCENTILE(aStat, 90) 30310.958 ``` @@ -341,7 +341,7 @@ It is important to note that the Profiler can persist any serializable Object, n 1. Merge all of the profile measurements over the past 30 minutes into a single sketch and calculate the 90th percentile. ``` [Stellar]>>> merged := STATS_MERGE( stats) - + [Stellar]>>> STATS_PERCENTILE(merged, 90) 29810.992 ``` http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/pom.xml ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/pom.xml b/metron-analytics/metron-profiler-common/pom.xml index 630709e..eff70c4 100644 --- a/metron-analytics/metron-profiler-common/pom.xml +++ b/metron-analytics/metron-profiler-common/pom.xml @@ -28,6 +28,11 @@ </properties> <dependencies> <dependency> + <groupId>com.github.ben-manes.caffeine</groupId> + <artifactId>caffeine</artifactId> + <version>${global_caffeine_version}</version> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-auth</artifactId> <version>${global_hadoop_version}</version> http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java index b0cd63b..0e50467 100644 --- a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java +++ b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java @@ -20,24 +20,30 @@ package org.apache.metron.profiler; -import com.google.common.base.Ticker; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.util.concurrent.MoreExecutors; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.stellar.dsl.Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.Serializable; import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import static java.lang.String.format; @@ -99,7 +105,7 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes) { - this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker()); + this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, Ticker.systemTicker(), Optional.empty()); } /** @@ -110,12 +116,14 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab * @param maxNumberOfRoutes The max number of unique routes to maintain. After this is exceeded, lesser * used routes will be evicted from the internal cache. * @param ticker The ticker used to drive time for the caches. Only needs set for testing. + * @param cacheMaintenanceExecutor The executor responsible for running cache maintenance tasks. Only needed for testing. */ public DefaultMessageDistributor( long periodDurationMillis, long profileTimeToLiveMillis, long maxNumberOfRoutes, - Ticker ticker) { + Ticker ticker, + Optional<Executor> cacheMaintenanceExecutor) { if(profileTimeToLiveMillis < periodDurationMillis) { throw new IllegalStateException(format( @@ -126,22 +134,34 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab this.periodDurationMillis = periodDurationMillis; // build the cache of active profiles - this.activeCache = CacheBuilder + Caffeine<Integer, ProfileBuilder> activeCacheBuilder = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ActiveCacheRemovalListener()) - .ticker(ticker) - .build(); + .ticker(ticker); + if (cacheMaintenanceExecutor.isPresent()) { + activeCacheBuilder.executor(cacheMaintenanceExecutor.get()); + } + if (LOG.isDebugEnabled()) { + activeCacheBuilder.recordStats(); + } + this.activeCache = activeCacheBuilder.build(); // build the cache of expired profiles - this.expiredCache = CacheBuilder + Caffeine<Integer, ProfileBuilder> expiredCacheBuilder = Caffeine .newBuilder() .maximumSize(maxNumberOfRoutes) .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS) .removalListener(new ExpiredCacheRemovalListener()) - .ticker(ticker) - .build(); + .ticker(ticker); + if (cacheMaintenanceExecutor.isPresent()) { + expiredCacheBuilder.executor(cacheMaintenanceExecutor.get()); + } + if (LOG.isDebugEnabled()) { + expiredCacheBuilder.recordStats(); + } + this.expiredCache = expiredCacheBuilder.build(); } /** @@ -219,8 +239,9 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab private void cacheMaintenance() { activeCache.cleanUp(); expiredCache.cleanUp(); - - LOG.debug("Cache maintenance complete: activeCacheSize={}, expiredCacheSize={}", activeCache.size(), expiredCache.size()); + LOG.debug("Cache maintenance triggered: activeCacheStats={}, expiredCacheStats={}", + activeCache.stats().toString(), + expiredCache.stats().toString()); } /** @@ -256,14 +277,14 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab public ProfileBuilder getBuilder(MessageRoute route, Context context) throws ExecutionException { ProfileConfig profile = route.getProfileDefinition(); String entity = route.getEntity(); - return activeCache.get( - cacheKey(profile, entity), - () -> new DefaultProfileBuilder.Builder() + Function<Integer, ProfileBuilder> profileCreator = (k) -> + new DefaultProfileBuilder.Builder() .withDefinition(profile) .withEntity(entity) .withPeriodDurationMillis(periodDurationMillis) .withContext(context) - .build()); + .build(); + return activeCache.get(cacheKey(profile, entity), profileCreator); } /** @@ -299,15 +320,13 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab private class ActiveCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable { @Override - public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) { - - ProfileBuilder expired = notification.getValue(); + public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { LOG.warn("Profile expired from active cache; profile={}, entity={}", expired.getDefinition().getProfile(), expired.getEntity()); // add the profile to the expired cache - expiredCache.put(notification.getKey(), expired); + expiredCache.put(key, expired); } } @@ -317,23 +336,20 @@ public class DefaultMessageDistributor implements MessageDistributor, Serializab private class ExpiredCacheRemovalListener implements RemovalListener<Integer, ProfileBuilder>, Serializable { @Override - public void onRemoval(RemovalNotification<Integer, ProfileBuilder> notification) { - - if(notification.wasEvicted()) { - + public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder expired, @Nonnull RemovalCause cause) { + if(cause.wasEvicted()) { // the expired profile was NOT flushed in time - ProfileBuilder expired = notification.getValue(); - LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}", + LOG.warn("Expired profile NOT flushed before removal, some state lost; profile={}, entity={}, cause={}", expired.getDefinition().getProfile(), - expired.getEntity()); + expired.getEntity(), + cause); } else { - // the expired profile was flushed successfully - ProfileBuilder expired = notification.getValue(); - LOG.debug("Expired profile successfully flushed; profile={}, entity={}", + LOG.debug("Expired profile successfully flushed; profile={}, entity={}, cause={}", expired.getDefinition().getProfile(), - expired.getEntity()); + expired.getEntity(), + cause); } } } http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java ---------------------------------------------------------------------- diff --git a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java index 48161e2..d1b7598 100644 --- a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java +++ b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java @@ -20,7 +20,8 @@ package org.apache.metron.profiler; -import com.google.common.base.Ticker; +import com.github.benmanes.caffeine.cache.Ticker; +import com.google.common.util.concurrent.MoreExecutors; import org.adrianwalker.multilinestring.Multiline; import org.apache.metron.common.configuration.profiler.ProfileConfig; import org.apache.metron.common.utils.JSONUtils; @@ -32,6 +33,7 @@ import org.junit.Test; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import static java.util.concurrent.TimeUnit.HOURS; @@ -102,7 +104,9 @@ public class DefaultMessageDistributorTest { distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, - maxNumberOfRoutes); + maxNumberOfRoutes, + Ticker.systemTicker(), + Optional.of(MoreExecutors.sameThreadExecutor())); } /** @@ -190,12 +194,13 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); + MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker); + ticker, + Optional.of(MoreExecutors.sameThreadExecutor())); // distribute one message distributor.distribute(route, context); @@ -220,12 +225,13 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); + MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker); + ticker, + Optional.of(MoreExecutors.sameThreadExecutor())); // distribute one message distributor.distribute(route, context); @@ -251,12 +257,13 @@ public class DefaultMessageDistributorTest { // setup ProfileConfig definition = createDefinition(profileOne); String entity = (String) messageOne.get("ip_src_addr"); - MessageRoute route = new MessageRoute(definition, entity, messageOne, System.currentTimeMillis()); + MessageRoute route = new MessageRoute(definition, entity, messageOne, ticker.read()); distributor = new DefaultMessageDistributor( periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, - ticker); + ticker, + Optional.of(MoreExecutors.sameThreadExecutor())); // distribute one message distributor.distribute(route, context); @@ -278,7 +285,7 @@ public class DefaultMessageDistributorTest { * An implementation of Ticker that can be used to drive time * when testing the Guava caches. */ - private class FixedTicker extends Ticker { + private class FixedTicker implements Ticker { /** * The time that will be reported. @@ -298,7 +305,7 @@ public class DefaultMessageDistributorTest { this.timestampNanos += units.toNanos(time); return this; } - + @Override public long read() { return this.timestampNanos;
