Repository: cassandra Updated Branches: refs/heads/trunk 95c4320ba -> c607d7641
http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java index 51f6569..052830a 100644 --- a/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java +++ b/src/java/org/apache/cassandra/metrics/HintedHandoffMetrics.java @@ -20,16 +20,17 @@ package org.apache.cassandra.metrics; import java.net.InetAddress; import java.util.Map.Entry; +import com.google.common.util.concurrent.MoreExecutors; + import com.codahale.metrics.Counter; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.utils.UUIDGen; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; - import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; /** @@ -42,31 +43,23 @@ public class HintedHandoffMetrics private static final MetricNameFactory factory = new DefaultNameFactory("HintedHandOffManager"); /** Total number of hints which are not stored, This is not a cache. */ - private final LoadingCache<InetAddress, DifferencingCounter> notStored = CacheBuilder.newBuilder().build(new CacheLoader<InetAddress, DifferencingCounter>() - { - public DifferencingCounter load(InetAddress address) - { - return new DifferencingCounter(address); - } - }); + private final LoadingCache<InetAddress, DifferencingCounter> notStored = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(DifferencingCounter::new); /** Total number of hints that have been created, This is not a cache. */ - private final LoadingCache<InetAddress, Counter> createdHintCounts = CacheBuilder.newBuilder().build(new CacheLoader<InetAddress, Counter>() - { - public Counter load(InetAddress address) - { - return Metrics.counter(factory.createMetricName("Hints_created-" + address.getHostAddress().replace(':', '.'))); - } - }); + private final LoadingCache<InetAddress, Counter> createdHintCounts = Caffeine.newBuilder() + .executor(MoreExecutors.directExecutor()) + .build(address -> Metrics.counter(factory.createMetricName("Hints_created-" + address.getHostAddress().replace(':', '.')))); public void incrCreatedHints(InetAddress address) { - createdHintCounts.getUnchecked(address).inc(); + createdHintCounts.get(address).inc(); } public void incrPastWindow(InetAddress address) { - notStored.getUnchecked(address).mark(); + notStored.get(address).mark(); } public void log() http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/net/RateBasedBackPressure.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java index 1dae243..64685b0 100644 --- a/src/java/org/apache/cassandra/net/RateBasedBackPressure.java +++ b/src/java/org/apache/cassandra/net/RateBasedBackPressure.java @@ -20,13 +20,13 @@ package org.apache.cassandra.net; import java.net.InetAddress; import java.util.Map; import java.util.Set; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; @@ -63,7 +63,10 @@ public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBack protected final long windowSize; private final Cache<Set<RateBasedBackPressureState>, IntervalRateLimiter> rateLimiters = - CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).build(); + Caffeine.newBuilder() + .expireAfterAccess(1, TimeUnit.HOURS) + .executor(MoreExecutors.directExecutor()) + .build(); enum Flow { @@ -217,42 +220,35 @@ public class RateBasedBackPressure implements BackPressureStrategy<RateBasedBack // Now find the rate limiter corresponding to the replica group represented by these back-pressure states: if (!states.isEmpty()) { - try - { - // Get the rate limiter: - IntervalRateLimiter rateLimiter = rateLimiters.get(states, () -> new IntervalRateLimiter(timeSource)); + // Get the rate limiter: + IntervalRateLimiter rateLimiter = rateLimiters.get(states, key -> new IntervalRateLimiter(timeSource)); - // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group: - if (isUpdated && rateLimiter.tryIntervalLock(windowSize)) + // If the back-pressure was updated and we acquire the interval lock for the rate limiter of this group: + if (isUpdated && rateLimiter.tryIntervalLock(windowSize)) + { + try { - try - { - // Update the rate limiter value based on the configured flow: - if (flow.equals(Flow.FAST)) - rateLimiter.limiter = currentMax; - else - rateLimiter.limiter = currentMin; + // Update the rate limiter value based on the configured flow: + if (flow.equals(Flow.FAST)) + rateLimiter.limiter = currentMax; + else + rateLimiter.limiter = currentMin; - tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states); - } - finally - { - rateLimiter.releaseIntervalLock(); - } + tenSecsNoSpamLogger.info("{} currently applied for remote replicas: {}", rateLimiter.limiter, states); + } + finally + { + rateLimiter.releaseIntervalLock(); } - // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate - // limiting to be stable within the group itself. - - // Finally apply the rate limit with a max pause time equal to the provided timeout minus the - // response time computed from the incoming rate, to reduce the number of client timeouts by taking into - // account how long it could take to process responses after back-pressure: - long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate); - doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos)); - } - catch (ExecutionException ex) - { - throw new IllegalStateException(ex); } + // Assigning a single rate limiter per replica group once per window size allows the back-pressure rate + // limiting to be stable within the group itself. + + // Finally apply the rate limit with a max pause time equal to the provided timeout minus the + // response time computed from the incoming rate, to reduce the number of client timeouts by taking into + // account how long it could take to process responses after back-pressure: + long responseTimeInNanos = (long) (TimeUnit.NANOSECONDS.convert(1, TimeUnit.SECONDS) / minIncomingRate); + doRateLimit(rateLimiter.limiter, Math.max(0, TimeUnit.NANOSECONDS.convert(timeout, unit) - responseTimeInNanos)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/security/CipherFactory.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/security/CipherFactory.java b/src/java/org/apache/cassandra/security/CipherFactory.java index 3f5c5f3..3c13629 100644 --- a/src/java/org/apache/cassandra/security/CipherFactory.java +++ b/src/java/org/apache/cassandra/security/CipherFactory.java @@ -25,17 +25,17 @@ import java.security.Key; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.Arrays; -import java.util.concurrent.ExecutionException; +import java.util.concurrent.CompletionException; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.IvParameterSpec; import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; +import com.google.common.util.concurrent.MoreExecutors; + +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.LoadingCache; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -79,26 +79,19 @@ public class CipherFactory throw new RuntimeException("couldn't load cipher factory", e); } - cache = CacheBuilder.newBuilder() // by default cache is unbounded + cache = Caffeine.newBuilder() // by default cache is unbounded .maximumSize(64) // a value large enough that we should never even get close (so nothing gets evicted) - .concurrencyLevel(Runtime.getRuntime().availableProcessors()) - .removalListener(new RemovalListener<String, Key>() + .executor(MoreExecutors.directExecutor()) + .removalListener((key, value, cause) -> { - public void onRemoval(RemovalNotification<String, Key> notice) - { - // maybe reload the key? (to avoid the reload being on the user's dime) - logger.info("key {} removed from cipher key cache", notice.getKey()); - } + // maybe reload the key? (to avoid the reload being on the user's dime) + logger.info("key {} removed from cipher key cache", key); }) - .build(new CacheLoader<String, Key>() - { - @Override - public Key load(String alias) throws Exception - { - logger.info("loading secret key for alias {}", alias); - return keyProvider.getSecretKey(alias); - } - }); + .build(alias -> + { + logger.info("loading secret key for alias {}", alias); + return keyProvider.getSecretKey(alias); + }); } public Cipher getEncryptor(String transformation, String keyAlias) throws IOException @@ -148,7 +141,7 @@ public class CipherFactory { return cache.get(keyAlias); } - catch (ExecutionException e) + catch (CompletionException e) { if (e.getCause() instanceof IOException) throw (IOException)e.getCause(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 26489c6..e64ec75 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -114,7 +114,7 @@ public class CacheService implements CacheServiceMBean // as values are constant size we can use singleton weigher // where 48 = 40 bytes (average size of the key) + 8 bytes (size of value) ICache<KeyCacheKey, RowIndexEntry> kc; - kc = ConcurrentLinkedHashCache.create(keyCacheInMemoryCapacity); + kc = CaffeineCache.create(keyCacheInMemoryCapacity); AutoSavingCache<KeyCacheKey, RowIndexEntry> keyCache = new AutoSavingCache<>(kc, CacheType.KEY_CACHE, new KeyCacheSerializer()); int keyCacheKeysToSave = DatabaseDescriptor.getKeyCacheKeysToSave(); @@ -163,7 +163,7 @@ public class CacheService implements CacheServiceMBean long capacity = DatabaseDescriptor.getCounterCacheSizeInMB() * 1024 * 1024; AutoSavingCache<CounterCacheKey, ClockAndCount> cache = - new AutoSavingCache<>(ConcurrentLinkedHashCache.<CounterCacheKey, ClockAndCount>create(capacity), + new AutoSavingCache<>(CaffeineCache.create(capacity), CacheType.COUNTER_CACHE, new CounterCacheSerializer()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/c607d764/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 371f177..f6a8acf 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -18,34 +18,35 @@ */ package org.apache.cassandra.cache; -import java.nio.ByteBuffer; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; - -import org.junit.BeforeClass; -import org.junit.Test; -import static org.junit.Assert.*; - - -import java.util.ArrayList; -import java.util.List; - +import com.github.benmanes.caffeine.cache.Weigher; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.utils.Pair; -import com.googlecode.concurrentlinkedhashmap.Weighers; - import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.RowUpdateBuilder; import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.partitions.CachedBTreePartition; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.utils.FBUtilities; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.nio.ByteBuffer; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertTrue; public class CacheProviderTest { @@ -147,7 +148,8 @@ public class CacheProviderTest @Test public void testSerializingCache() throws InterruptedException { - ICache<MeasureableString, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, Weighers.<RefCountedMemory>singleton(), new SerializingCacheProvider.RowCacheSerializer()); + ICache<MeasureableString, IRowCacheEntry> cache = SerializingCache.create(CAPACITY, + Weigher.singletonWeigher(), new SerializingCacheProvider.RowCacheSerializer()); CachedBTreePartition partition = createPartition(); simpleCase(partition, cache); concurrentCase(partition, cache);
