Repository: metron
Updated Branches:
  refs/heads/master 05d309f91 -> 575ba03b9


METRON-1880 Use Caffeine for Profiler Caching (nickwallen) closes 
apache/metron#1270


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/575ba03b
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/575ba03b
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/575ba03b

Branch: refs/heads/master
Commit: 575ba03b9d91d96bf5c2bd1dbee95681ae3131d0
Parents: 05d309f
Author: nickwallen <[email protected]>
Authored: Tue Nov 20 13:31:46 2018 -0500
Committer: nickallen <[email protected]>
Committed: Tue Nov 20 13:31:46 2018 -0500

----------------------------------------------------------------------
 .../metron-profiler-common/README.md            | 14 ++--
 metron-analytics/metron-profiler-common/pom.xml |  5 ++
 .../profiler/DefaultMessageDistributor.java     | 84 ++++++++++++--------
 .../profiler/DefaultMessageDistributorTest.java | 27 ++++---
 4 files changed, 79 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/README.md
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/README.md 
b/metron-analytics/metron-profiler-common/README.md
index 8f26aaf..fe4c2ed 100644
--- a/metron-analytics/metron-profiler-common/README.md
+++ b/metron-analytics/metron-profiler-common/README.md
@@ -28,7 +28,7 @@ The Profiler is a feature extraction mechanism that can 
generate a profile descr
 
 This is achieved by summarizing the telemetry data consumed by Metron over 
tumbling windows. A summary statistic is applied to the data received within a 
given window.  Collecting these values across many windows result in a time 
series that is useful for analysis.
 
-Any field contained within a message can be used to generate a profile.  A 
profile can even be produced by combining fields that originate in different 
data sources.  A user has considerable power to transform the data used in a 
profile by leveraging the Stellar language. 
+Any field contained within a message can be used to generate a profile.  A 
profile can even be produced by combining fields that originate in different 
data sources.  A user has considerable power to transform the data used in a 
profile by leveraging the Stellar language.
 
 There are three separate ports of the Profiler that share this common code 
base.
 * The [Storm Profiler](../metron-profiler-storm/README.md) builds low-latency 
profiles over streaming data sets.
@@ -58,12 +58,12 @@ Let's start with a simple example. The following profile 
maintains a count of th
       "profile": "hello-world",
       "foreach": "ip_src_addr",
       "init": {
-        "count": 0
+        "count": "0"
       },
       "update": {
         "count": "count + 1"
       },
-      "result": "count",
+      "result": "count"
     }
   ]
 }
@@ -321,7 +321,7 @@ It is important to note that the Profiler can persist any 
serializable Object, n
     ```
     $ source /etc/default/metron
     $ bin/stellar -z $ZOOKEEPER
-    
+
     [Stellar]>>> stats := PROFILE_GET( "example4", "10.0.0.1", 
PROFILE_FIXED(30, "MINUTES"))
     [org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9, 
...]
     ```
@@ -330,10 +330,10 @@ It is important to note that the Profiler can persist any 
serializable Object, n
     ```
     [Stellar]>>> aStat := GET_FIRST(stats)
     org.apache.metron.common.math.stats.OnlineStatisticsProvider@79fe4ab9
-    
+
     [Stellar]>>> STATS_MEAN(aStat)
     15979.0625
-    
+
     [Stellar]>>> STATS_PERCENTILE(aStat, 90)
     30310.958
     ```
@@ -341,7 +341,7 @@ It is important to note that the Profiler can persist any 
serializable Object, n
 1. Merge all of the profile measurements over the past 30 minutes into a 
single sketch and calculate the 90th percentile.
     ```
     [Stellar]>>> merged := STATS_MERGE( stats)
-    
+
     [Stellar]>>> STATS_PERCENTILE(merged, 90)
     29810.992
     ```

http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/pom.xml
----------------------------------------------------------------------
diff --git a/metron-analytics/metron-profiler-common/pom.xml 
b/metron-analytics/metron-profiler-common/pom.xml
index 630709e..eff70c4 100644
--- a/metron-analytics/metron-profiler-common/pom.xml
+++ b/metron-analytics/metron-profiler-common/pom.xml
@@ -28,6 +28,11 @@
     </properties>
     <dependencies>
         <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+            <version>${global_caffeine_version}</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-auth</artifactId>
             <version>${global_hadoop_version}</version>

http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
index b0cd63b..0e50467 100644
--- 
a/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
+++ 
b/metron-analytics/metron-profiler-common/src/main/java/org/apache/metron/profiler/DefaultMessageDistributor.java
@@ -20,24 +20,30 @@
 
 package org.apache.metron.profiler;
 
-import com.google.common.base.Ticker;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalCause;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.stellar.dsl.Context;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 import java.io.Serializable;
 import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
 
 import static java.lang.String.format;
 
@@ -99,7 +105,7 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
           long periodDurationMillis,
           long profileTimeToLiveMillis,
           long maxNumberOfRoutes) {
-    this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, 
Ticker.systemTicker());
+    this(periodDurationMillis, profileTimeToLiveMillis, maxNumberOfRoutes, 
Ticker.systemTicker(), Optional.empty());
   }
 
   /**
@@ -110,12 +116,14 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
    * @param maxNumberOfRoutes The max number of unique routes to maintain.  
After this is exceeded, lesser
    *                          used routes will be evicted from the internal 
cache.
    * @param ticker The ticker used to drive time for the caches.  Only needs 
set for testing.
+   * @param cacheMaintenanceExecutor The executor responsible for running 
cache maintenance tasks. Only needed for testing.
    */
   public DefaultMessageDistributor(
           long periodDurationMillis,
           long profileTimeToLiveMillis,
           long maxNumberOfRoutes,
-          Ticker ticker) {
+          Ticker ticker,
+          Optional<Executor> cacheMaintenanceExecutor) {
 
     if(profileTimeToLiveMillis < periodDurationMillis) {
       throw new IllegalStateException(format(
@@ -126,22 +134,34 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
     this.periodDurationMillis = periodDurationMillis;
 
     // build the cache of active profiles
-    this.activeCache = CacheBuilder
+    Caffeine<Integer, ProfileBuilder> activeCacheBuilder = Caffeine
             .newBuilder()
             .maximumSize(maxNumberOfRoutes)
             .expireAfterAccess(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
             .removalListener(new ActiveCacheRemovalListener())
-            .ticker(ticker)
-            .build();
+            .ticker(ticker);
+    if (cacheMaintenanceExecutor.isPresent()) {
+      activeCacheBuilder.executor(cacheMaintenanceExecutor.get());
+    }
+    if (LOG.isDebugEnabled()) {
+      activeCacheBuilder.recordStats();
+    }
+    this.activeCache = activeCacheBuilder.build();
 
     // build the cache of expired profiles
-    this.expiredCache = CacheBuilder
+    Caffeine<Integer, ProfileBuilder> expiredCacheBuilder = Caffeine
             .newBuilder()
             .maximumSize(maxNumberOfRoutes)
             .expireAfterWrite(profileTimeToLiveMillis, TimeUnit.MILLISECONDS)
             .removalListener(new ExpiredCacheRemovalListener())
-            .ticker(ticker)
-            .build();
+            .ticker(ticker);
+    if (cacheMaintenanceExecutor.isPresent()) {
+      expiredCacheBuilder.executor(cacheMaintenanceExecutor.get());
+    }
+    if (LOG.isDebugEnabled()) {
+      expiredCacheBuilder.recordStats();
+    }
+    this.expiredCache = expiredCacheBuilder.build();
   }
 
   /**
@@ -219,8 +239,9 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
   private void cacheMaintenance() {
     activeCache.cleanUp();
     expiredCache.cleanUp();
-
-    LOG.debug("Cache maintenance complete: activeCacheSize={}, 
expiredCacheSize={}", activeCache.size(), expiredCache.size());
+    LOG.debug("Cache maintenance triggered: activeCacheStats={}, 
expiredCacheStats={}",
+            activeCache.stats().toString(),
+            expiredCache.stats().toString());
   }
 
   /**
@@ -256,14 +277,14 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
   public ProfileBuilder getBuilder(MessageRoute route, Context context) throws 
ExecutionException {
     ProfileConfig profile = route.getProfileDefinition();
     String entity = route.getEntity();
-    return activeCache.get(
-            cacheKey(profile, entity),
-            () -> new DefaultProfileBuilder.Builder()
+    Function<Integer, ProfileBuilder> profileCreator = (k) ->
+            new DefaultProfileBuilder.Builder()
                     .withDefinition(profile)
                     .withEntity(entity)
                     .withPeriodDurationMillis(periodDurationMillis)
                     .withContext(context)
-                    .build());
+                    .build();
+    return activeCache.get(cacheKey(profile, entity), profileCreator);
   }
 
   /**
@@ -299,15 +320,13 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
   private class ActiveCacheRemovalListener implements RemovalListener<Integer, 
ProfileBuilder>, Serializable {
 
     @Override
-    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> 
notification) {
-
-      ProfileBuilder expired = notification.getValue();
+    public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder 
expired, @Nonnull RemovalCause cause) {
       LOG.warn("Profile expired from active cache; profile={}, entity={}",
               expired.getDefinition().getProfile(),
               expired.getEntity());
 
       // add the profile to the expired cache
-      expiredCache.put(notification.getKey(), expired);
+      expiredCache.put(key, expired);
     }
   }
 
@@ -317,23 +336,20 @@ public class DefaultMessageDistributor implements 
MessageDistributor, Serializab
   private class ExpiredCacheRemovalListener implements 
RemovalListener<Integer, ProfileBuilder>, Serializable {
 
     @Override
-    public void onRemoval(RemovalNotification<Integer, ProfileBuilder> 
notification) {
-
-      if(notification.wasEvicted()) {
-
+    public void onRemoval(@Nullable Integer key, @Nullable ProfileBuilder 
expired, @Nonnull RemovalCause cause) {
+      if(cause.wasEvicted()) {
         // the expired profile was NOT flushed in time
-        ProfileBuilder expired = notification.getValue();
-        LOG.warn("Expired profile NOT flushed before removal, some state lost; 
profile={}, entity={}",
+        LOG.warn("Expired profile NOT flushed before removal, some state lost; 
profile={}, entity={}, cause={}",
                 expired.getDefinition().getProfile(),
-                expired.getEntity());
+                expired.getEntity(),
+                cause);
 
       } else {
-
         // the expired profile was flushed successfully
-        ProfileBuilder expired = notification.getValue();
-        LOG.debug("Expired profile successfully flushed; profile={}, 
entity={}",
+        LOG.debug("Expired profile successfully flushed; profile={}, 
entity={}, cause={}",
                 expired.getDefinition().getProfile(),
-                expired.getEntity());
+                expired.getEntity(),
+                cause);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/metron/blob/575ba03b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
----------------------------------------------------------------------
diff --git 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
index 48161e2..d1b7598 100644
--- 
a/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
+++ 
b/metron-analytics/metron-profiler-common/src/test/java/org/apache/metron/profiler/DefaultMessageDistributorTest.java
@@ -20,7 +20,8 @@
 
 package org.apache.metron.profiler;
 
-import com.google.common.base.Ticker;
+import com.github.benmanes.caffeine.cache.Ticker;
+import com.google.common.util.concurrent.MoreExecutors;
 import org.adrianwalker.multilinestring.Multiline;
 import org.apache.metron.common.configuration.profiler.ProfileConfig;
 import org.apache.metron.common.utils.JSONUtils;
@@ -32,6 +33,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 
 import static java.util.concurrent.TimeUnit.HOURS;
@@ -102,7 +104,9 @@ public class DefaultMessageDistributorTest {
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
-            maxNumberOfRoutes);
+            maxNumberOfRoutes,
+            Ticker.systemTicker(),
+            Optional.of(MoreExecutors.sameThreadExecutor()));
   }
 
   /**
@@ -190,12 +194,13 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
System.currentTimeMillis());
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
ticker.read());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker);
+            ticker,
+            Optional.of(MoreExecutors.sameThreadExecutor()));
 
     // distribute one message
     distributor.distribute(route, context);
@@ -220,12 +225,13 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
System.currentTimeMillis());
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
ticker.read());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker);
+            ticker,
+            Optional.of(MoreExecutors.sameThreadExecutor()));
 
     // distribute one message
     distributor.distribute(route, context);
@@ -251,12 +257,13 @@ public class DefaultMessageDistributorTest {
     // setup
     ProfileConfig definition = createDefinition(profileOne);
     String entity = (String) messageOne.get("ip_src_addr");
-    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
System.currentTimeMillis());
+    MessageRoute route = new MessageRoute(definition, entity, messageOne, 
ticker.read());
     distributor = new DefaultMessageDistributor(
             periodDurationMillis,
             profileTimeToLiveMillis,
             maxNumberOfRoutes,
-            ticker);
+            ticker,
+            Optional.of(MoreExecutors.sameThreadExecutor()));
 
     // distribute one message
     distributor.distribute(route, context);
@@ -278,7 +285,7 @@ public class DefaultMessageDistributorTest {
    * An implementation of Ticker that can be used to drive time
    * when testing the Guava caches.
    */
-  private class FixedTicker extends Ticker {
+  private class FixedTicker implements Ticker {
 
     /**
      * The time that will be reported.
@@ -298,7 +305,7 @@ public class DefaultMessageDistributorTest {
       this.timestampNanos += units.toNanos(time);
       return this;
     }
-
+    
     @Override
     public long read() {
       return this.timestampNanos;

Reply via email to