Repository: cassandra
Updated Branches:
  refs/heads/trunk 95c4320ba -> c607d7641


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java 
b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
index 51f6569..052830a 100644
--- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java
@@ -20,16 +20,17 @@ package org.apache.cassandra.metrics;
 import java.net.InetAddress;
 import java.util.Map.Entry;
 
+import com.google.common.util.concurrent.MoreExecutors;
+
 import com.codahale.metrics.Counter;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.utils.UUIDGen;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-
 import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
 
 /**
@@ -42,31 +43,23 @@ public class HintedHandoffMetrics
     private static final MetricNameFactory factory = new 
DefaultNameFactory("HintedHandOffManager");
 
     /** Total number of hints which are not stored, This is not a cache. */
-    private final LoadingCache<InetAddress, DifferencingCounter> notStored = 
CacheBuilder.newBuilder().build(new CacheLoader<InetAddress, 
DifferencingCounter>()
-    {
-        public DifferencingCounter load(InetAddress address)
-        {
-            return new DifferencingCounter(address);
-        }
-    });
+    private final LoadingCache<InetAddress, DifferencingCounter> notStored = 
Caffeine.newBuilder()
+         .executor(MoreExecutors.directExecutor())
+         .build(DifferencingCounter::new);
 
     /** Total number of hints that have been created, This is not a cache. */
-    private final LoadingCache<InetAddress, Counter> createdHintCounts = 
CacheBuilder.newBuilder().build(new CacheLoader<InetAddress, Counter>()
-    {
-        public Counter load(InetAddress address)
-        {
-            return Metrics.counter(factory.createMetricName("Hints_created-" + 
address.getHostAddress().replace(':', '.')));
-        }
-    });
+    private final LoadingCache<InetAddress, Counter> createdHintCounts = 
Caffeine.newBuilder()
+         .executor(MoreExecutors.directExecutor())
+         .build(address -> 
Metrics.counter(factory.createMetricName("Hints_created-" + 
address.getHostAddress().replace(':', '.'))));
 
     public void incrCreatedHints(InetAddress address)
     {
-        createdHintCounts.getUnchecked(address).inc();
+        createdHintCounts.get(address).inc();
     }
 
     public void incrPastWindow(InetAddress address)
     {
-        notStored.getUnchecked(address).mark();
+        notStored.get(address).mark();
     }
 
     public void log()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java 
b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
index 1dae243..64685b0 100644
--- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
+++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java
@@ -20,13 +20,13 @@ package org.apache.cassandra.net;
 import java.net.InetAddress;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.RateLimiter;
 
 import org.slf4j.Logger;
@@ -63,7 +63,10 @@ public class RateBasedBackPressure implements 
BackPressureStrategy<RateBasedBack
     protected final long windowSize;
 
     private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> 
rateLimiters =
-            CacheBuilder.newBuilder().expireAfterAccess(1, 
TimeUnit.HOURS).build();
+            Caffeine.newBuilder()
+                    .expireAfterAccess(1, TimeUnit.HOURS)
+                    .executor(MoreExecutors.directExecutor())
+                    .build();
 
     enum Flow
     {
@@ -217,42 +220,35 @@ public class RateBasedBackPressure implements 
BackPressureStrategy<RateBasedBack
         // Now find the rate limiter corresponding to the replica group 
represented by these back-pressure states:
         if (!states.isEmpty())
         {
-            try
-            {
-                // Get the rate limiter:
-                IntervalRateLimiter rateLimiter = rateLimiters.get(states, () 
-> new IntervalRateLimiter(timeSource));
+            // Get the rate limiter:
+            IntervalRateLimiter rateLimiter = rateLimiters.get(states, key -> 
new IntervalRateLimiter(timeSource));
 
-                // If the back-pressure was updated and we acquire the 
interval lock for the rate limiter of this group:
-                if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
+            // If the back-pressure was updated and we acquire the interval 
lock for the rate limiter of this group:
+            if (isUpdated && rateLimiter.tryIntervalLock(windowSize))
+            {
+                try
                 {
-                    try
-                    {
-                        // Update the rate limiter value based on the 
configured flow:
-                        if (flow.equals(Flow.FAST))
-                            rateLimiter.limiter = currentMax;
-                        else
-                            rateLimiter.limiter = currentMin;
+                    // Update the rate limiter value based on the configured 
flow:
+                    if (flow.equals(Flow.FAST))
+                        rateLimiter.limiter = currentMax;
+                    else
+                        rateLimiter.limiter = currentMin;
 
-                        tenSecsNoSpamLogger.info("{} currently applied for 
remote replicas: {}", rateLimiter.limiter, states);
-                    }
-                    finally
-                    {
-                        rateLimiter.releaseIntervalLock();
-                    }
+                    tenSecsNoSpamLogger.info("{} currently applied for remote 
replicas: {}", rateLimiter.limiter, states);
+                }
+                finally
+                {
+                    rateLimiter.releaseIntervalLock();
                 }
-                // Assigning a single rate limiter per replica group once per 
window size allows the back-pressure rate
-                // limiting to be stable within the group itself.
-
-                // Finally apply the rate limit with a max pause time equal to 
the provided timeout minus the
-                // response time computed from the incoming rate, to reduce 
the number of client timeouts by taking into
-                // account how long it could take to process responses after 
back-pressure:
-                long responseTimeInNanos = (long) 
(TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate);
-                doRateLimit(rateLimiter.limiter, Math.max(0, 
TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
-            }
-            catch (ExecutionException ex)
-            {
-                throw new IllegalStateException(ex);
             }
+            // Assigning a single rate limiter per replica group once per 
window size allows the back-pressure rate
+            // limiting to be stable within the group itself.
+
+            // Finally apply the rate limit with a max pause time equal to the 
provided timeout minus the
+            // response time computed from the incoming rate, to reduce the 
number of client timeouts by taking into
+            // account how long it could take to process responses after 
back-pressure:
+            long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, 
TimeUnit.SECONDS) / minIncomingRate);
+            doRateLimit(rateLimiter.limiter, Math.max(0, 
TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/security/CipherFactory.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java 
b/src/java/org/apache/cassandra/security/CipherFactory.java
index 3f5c5f3..3c13629 100644
--- a/src/java/org/apache/cassandra/security/CipherFactory.java
+++ b/src/java/org/apache/cassandra/security/CipherFactory.java
@@ -25,17 +25,17 @@ import java.security.Key;
 import java.security.NoSuchAlgorithmException;
 import java.security.SecureRandom;
 import java.util.Arrays;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CompletionException;
 import javax.crypto.Cipher;
 import javax.crypto.NoSuchPaddingException;
 import javax.crypto.spec.IvParameterSpec;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -79,26 +79,19 @@ public class CipherFactory
             throw new RuntimeException("couldn't load cipher factory", e);
         }
 
-        cache = CacheBuilder.newBuilder() // by default cache is unbounded
+        cache = Caffeine.newBuilder() // by default cache is unbounded
                 .maximumSize(64) // a value large enough that we should never 
even get close (so nothing gets evicted)
-                .concurrencyLevel(Runtime.getRuntime().availableProcessors())
-                .removalListener(new RemovalListener<String, Key>()
+                .executor(MoreExecutors.directExecutor())
+                .removalListener((key, value, cause) ->
                 {
-                    public void onRemoval(RemovalNotification<String, Key> 
notice)
-                    {
-                        // maybe reload the key? (to avoid the reload being on 
the user's dime)
-                        logger.info("key {} removed from cipher key cache", 
notice.getKey());
-                    }
+                    // maybe reload the key? (to avoid the reload being on the 
user's dime)
+                    logger.info("key {} removed from cipher key cache", key);
                 })
-                .build(new CacheLoader<String, Key>()
-                {
-                    @Override
-                    public Key load(String alias) throws Exception
-                    {
-                        logger.info("loading secret key for alias {}", alias);
-                        return keyProvider.getSecretKey(alias);
-                    }
-                });
+                .build(alias ->
+                       {
+                           logger.info("loading secret key for alias {}", 
alias);
+                           return keyProvider.getSecretKey(alias);
+                       });
     }
 
     public Cipher getEncryptor(String transformation, String keyAlias) throws 
IOException
@@ -148,7 +141,7 @@ public class CipherFactory
         {
             return cache.get(keyAlias);
         }
-        catch (ExecutionException e)
+        catch (CompletionException e)
         {
             if (e.getCause() instanceof IOException)
                 throw (IOException)e.getCause();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/service/CacheService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CacheService.java 
b/src/java/org/apache/cassandra/service/CacheService.java
index 26489c6..e64ec75 100644
--- a/src/java/org/apache/cassandra/service/CacheService.java
+++ b/src/java/org/apache/cassandra/service/CacheService.java
@@ -114,7 +114,7 @@ public class CacheService implements CacheServiceMBean
         // as values are constant size we can use singleton weigher
         // where 48 = 40 bytes (average size of the key) + 8 bytes (size of 
value)
         ICache<KeyCacheKey, RowIndexEntry> kc;
-        kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity);
+        kc = CaffeineCache.create(keyCacheInMemoryCapacity);
         AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new 
AutoSavingCache<>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer());
 
         int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave();
@@ -163,7 +163,7 @@ public class CacheService implements CacheServiceMBean
         long capacity = DatabaseDescriptor.getCounterCacheSizeInMB() * 1024 * 
1024;
 
         AutoSavingCache<CounterCacheKey, ClockAndCount> cache =
-            new AutoSavingCache<>(ConcurrentLinkedHashCache.<CounterCacheKey, 
ClockAndCount>create(capacity),
+            new AutoSavingCache<>(CaffeineCache.create(capacity),
                                   CacheType.COUNTER_CACHE,
                                   new CounterCacheSerializer());
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java 
b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
index 371f177..f6a8acf 100644
--- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
+++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java
@@ -18,34 +18,35 @@
  */
 package org.apache.cassandra.cache;
 
-import java.nio.ByteBuffer;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import static org.junit.Assert.*;
-
-
-import java.util.ArrayList;
-import java.util.List;
-
+import com.github.benmanes.caffeine.cache.Weigher;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.utils.Pair;
 
-import com.googlecode.concurrentlinkedhashmap.Weighers;
-
 import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.db.*;
-import org.apache.cassandra.db.rows.*;
+import org.apache.cassandra.db.RowUpdateBuilder;
 import org.apache.cassandra.db.marshal.AsciiType;
-import org.apache.cassandra.db.partitions.*;
+import org.apache.cassandra.db.partitions.CachedBTreePartition;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.UnfilteredRowIterators;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.KeyspaceParams;
 import org.apache.cassandra.utils.FBUtilities;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertTrue;
 
 public class CacheProviderTest
 {
@@ -147,7 +148,8 @@ public class CacheProviderTest
     @Test
     public void testSerializingCache() throws InterruptedException
     {
-        ICache<MeasureableString, IRowCacheEntry> cache = 
SerializingCache.create(CAPACITY, Weighers.<RefCountedMemory>singleton(), new 
SerializingCacheProvider.RowCacheSerializer());
+        ICache<MeasureableString, IRowCacheEntry> cache = 
SerializingCache.create(CAPACITY,
+            Weigher.singletonWeigher(), new 
SerializingCacheProvider.RowCacheSerializer());
         CachedBTreePartition partition = createPartition();
         simpleCase(partition, cache);
         concurrentCase(partition, cache);

Reply via email to