Repository: cassandra Updated Branches: refs/heads/trunk 732986bbd -> dfd78d0e9
Serializing Row cache alternative, fully off heap Patch by Robert Stupp, reviewed by Ariel Weisberg for CASSANDRA-7438 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/dfd78d0e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/dfd78d0e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/dfd78d0e Branch: refs/heads/trunk Commit: dfd78d0e90b9995872b00f6f33afc180f2b7cba1 Parents: 732986b Author: Robert Stupp <[email protected]> Authored: Wed Jan 21 20:50:35 2015 +0100 Committer: Robert Stupp <[email protected]> Committed: Wed Jan 21 20:50:35 2015 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + NOTICE.txt | 5 + build.xml | 3 +- conf/cassandra.yaml | 22 +- lib/licenses/ohc-0.2.1.txt | 201 ++++++++++++++ lib/ohc-core-0.3.1.jar | Bin 0 -> 143623 bytes lib/ohc-core-j8-0.3.1.jar | Bin 0 -> 5026 bytes .../apache/cassandra/cache/AutoSavingCache.java | 42 ++- .../apache/cassandra/cache/CacheProvider.java | 23 ++ .../cache/ConcurrentLinkedHashCache.java | 12 +- src/java/org/apache/cassandra/cache/ICache.java | 6 +- .../cassandra/cache/InstrumentingCache.java | 24 +- .../cassandra/cache/NopCacheProvider.java | 93 +++++++ .../org/apache/cassandra/cache/OHCProvider.java | 274 +++++++++++++++++++ .../org/apache/cassandra/cache/RowCacheKey.java | 6 + .../cassandra/cache/SerializingCache.java | 10 +- .../cache/SerializingCacheProvider.java | 7 +- .../org/apache/cassandra/config/Config.java | 1 + .../cassandra/config/DatabaseDescriptor.java | 15 +- .../config/YamlConfigurationLoader.java | 2 +- .../apache/cassandra/db/ColumnFamilyStore.java | 12 +- .../apache/cassandra/service/CacheService.java | 23 +- test/conf/cassandra.yaml | 2 + .../org/apache/cassandra/db/KeyCacheTest.java | 9 +- .../org/apache/cassandra/db/RowCacheTest.java | 14 +- 25 files changed, 731 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index cc6478e..99a5d51 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0 + * Serializing Row cache alternative, fully off heap (CASSANDRA-7438) * Duplicate rows returned when in clause has repeated values (CASSANDRA-6707) * Make CassandraException unchecked, extend RuntimeException (CASSANDRA-8560) * Support direct buffer decompression for reads (CASSANDRA-8464) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/NOTICE.txt ---------------------------------------------------------------------- diff --git a/NOTICE.txt b/NOTICE.txt index 2fe15f5..43d4514 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -66,3 +66,8 @@ Javassist SIGAR http://sigar.hyperic.com/ + +OHC +(https://github.com/snazy/ohc) +Java Off-Heap-Cache, licensed under APLv2 +Copyright 2014-2015 Robert Stupp, Germany. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index baf6a77..fccd009 100644 --- a/build.xml +++ b/build.xml @@ -375,6 +375,7 @@ <dependency groupId="com.clearspring.analytics" artifactId="stream" version="2.5.2" /> <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core" version="2.1.2" /> <dependency groupId="org.javassist" artifactId="javassist" version="3.18.2-GA" /> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core" version="0.3.1" /> <dependency groupId="net.sf.supercsv" artifactId="super-csv" version="2.1.0" /> <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations" version="1.2.0" /> <dependency groupId="org.fusesource" artifactId="sigar" version="1.6.4"> @@ -424,10 +425,10 @@ <dependency groupId="org.antlr" artifactId="antlr"/> <dependency groupId="com.datastax.cassandra" artifactId="cassandra-driver-core"/> <dependency groupId="org.javassist" artifactId="javassist"/> + <dependency groupId="org.caffinitas.ohc" artifactId="ohc-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-core"/> <dependency groupId="org.openjdk.jmh" artifactId="jmh-generator-annprocess"/> <dependency groupId="net.ju-n.compile-command-annotations" artifactId="compile-command-annotations"/> - <dependency groupId="org.javassist" artifactId="javassist" /> </artifact:pom> <artifact:pom id="coverage-deps-pom" http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml index 24bab09..e1bfca6 100644 --- a/conf/cassandra.yaml +++ b/conf/cassandra.yaml @@ -181,15 +181,25 @@ key_cache_save_period: 14400 # Disabled by default, meaning all keys are going to be saved # key_cache_keys_to_save: 100 +# Row cache implementation class name. +# Available implementations: +# org.apache.cassandra.cache.OHCProvider Fully off-heap row cache implementation (default). +# org.apache.cassandra.cache.SerializingCacheProvider This is the row cache implementation availabile +# in previous releases of Cassandra. +# row_cache_class_name: org.apache.cassandra.cache.OHCProvider + # Maximum size of the row cache in memory. -# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup. +# Please note that OHC cache implementation requires some additional off-heap memory to manage +# the map structures and some in-flight memory during operations before/after cache entries can be +# accounted against the cache capacity. This overhead is usually small compared to the whole capacity. +# Do not specify more memory that the system can afford in the worst usual situation and leave some +# headroom for OS block level cache. Do never allow your system to swap. # # Default value is 0, to disable row caching. row_cache_size_in_mb: 0 -# Duration in seconds after which Cassandra should -# save the row cache. Caches are saved to saved_caches_directory as specified -# in this configuration file. +# Duration in seconds after which Cassandra should save the row cache. +# Caches are saved to saved_caches_directory as specified in this configuration file. # # Saved caches greatly improve cold-start speeds, and is relatively cheap in # terms of I/O for the key cache. Row cache saving is much more expensive and @@ -198,8 +208,8 @@ row_cache_size_in_mb: 0 # Default is 0 to disable saving the row cache. row_cache_save_period: 0 -# Number of keys from the row cache to save -# Disabled by default, meaning all keys are going to be saved +# Number of keys from the row cache to save. +# Specify 0 (which is the default), meaning all keys are going to be saved # row_cache_keys_to_save: 100 # Maximum size of the counter cache in memory. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/licenses/ohc-0.2.1.txt ---------------------------------------------------------------------- diff --git a/lib/licenses/ohc-0.2.1.txt b/lib/licenses/ohc-0.2.1.txt new file mode 100644 index 0000000..eb6b5d3 --- /dev/null +++ b/lib/licenses/ohc-0.2.1.txt @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2014 Robert Stupp, Koeln, Germany, robert-stupp.de + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/ohc-core-0.3.1.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-0.3.1.jar b/lib/ohc-core-0.3.1.jar new file mode 100644 index 0000000..a6d2c9c Binary files /dev/null and b/lib/ohc-core-0.3.1.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/lib/ohc-core-j8-0.3.1.jar ---------------------------------------------------------------------- diff --git a/lib/ohc-core-j8-0.3.1.jar b/lib/ohc-core-j8-0.3.1.jar new file mode 100644 index 0000000..efe7682 Binary files /dev/null and b/lib/ohc-core-j8-0.3.1.jar differ http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/AutoSavingCache.java b/src/java/org/apache/cassandra/cache/AutoSavingCache.java index 92e6a6d..fd87631 100644 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@ -59,7 +59,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K protected volatile ScheduledFuture<?> saveTask; protected final CacheService.CacheType cacheType; - private CacheSerializer<K, V> cacheLoader; + private final CacheSerializer<K, V> cacheLoader; private static final String CURRENT_VERSION = "b"; private static volatile IStreamFactory streamFactory = new IStreamFactory() @@ -177,16 +177,24 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K public class Writer extends CompactionInfo.Holder { - private final Set<K> keys; + private final Iterator<K> keyIterator; private final CompactionInfo info; private long keysWritten; + private final long keysEstimate; protected Writer(int keysToSave) { - if (keysToSave >= getKeySet().size()) - keys = getKeySet(); + int size = size(); + if (keysToSave >= size || keysToSave == 0) + { + keyIterator = keyIterator(); + keysEstimate = size; + } else - keys = hotKeySet(keysToSave); + { + keyIterator = hotKeyIterator(keysToSave); + keysEstimate = keysToSave; + } OperationType type; if (cacheType == CacheService.CacheType.KEY_CACHE) @@ -201,7 +209,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K info = new CompactionInfo(CFMetaData.denseCFMetaData(SystemKeyspace.NAME, cacheType.toString(), BytesType.instance), type, 0, - keys.size(), + keysEstimate, "keys"); } @@ -213,7 +221,8 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K public CompactionInfo getCompactionInfo() { // keyset can change in size, thus total can too - return info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); + // TODO need to check for this one... was: info.forProgress(keysWritten, Math.max(keysWritten, keys.size())); + return info.forProgress(keysWritten, Math.max(keysWritten, keysEstimate)); } public void saveCache() @@ -221,7 +230,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K logger.debug("Deleting old {} files.", cacheType); deleteOldCacheFiles(); - if (keys.isEmpty()) + if (!keyIterator.hasNext()) { logger.debug("Skipping {} save, cache is empty.", cacheType); return; @@ -235,8 +244,9 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K try { - for (K key : keys) + while (keyIterator.hasNext()) { + K key = keyIterator.next(); UUID cfId = key.getCFId(); if (!Schema.instance.hasCF(key.getCFId())) continue; // the table has been dropped. @@ -270,10 +280,22 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K } keysWritten++; + if (keysWritten >= keysEstimate) + break; } } finally { + if (keyIterator instanceof Closeable) + try + { + ((Closeable)keyIterator).close(); + } + catch (IOException ignored) + { + // not thrown (by OHC) + } + for (OutputStream writer : streams.values()) FileUtils.closeQuietly(writer); } @@ -290,7 +312,7 @@ public class AutoSavingCache<K extends CacheKey, V> extends InstrumentingCache<K logger.error("Unable to rename {} to {}", tmpFile, cacheFile); } - logger.info("Saved {} ({} items) in {} ms", cacheType, keys.size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); + logger.info("Saved {} ({} items) in {} ms", cacheType, keysWritten, TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start)); } private File tempCacheFile(UUID cfId) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/CacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CacheProvider.java b/src/java/org/apache/cassandra/cache/CacheProvider.java new file mode 100644 index 0000000..6a97be3 --- /dev/null +++ b/src/java/org/apache/cassandra/cache/CacheProvider.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + +public interface CacheProvider<K, V> +{ + ICache<K, V> create(); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java index 8182447..bb14055 100644 --- a/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java +++ b/src/java/org/apache/cassandra/cache/ConcurrentLinkedHashCache.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.cache; -import java.util.Set; +import java.util.Iterator; import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; import com.googlecode.concurrentlinkedhashmap.EntryWeigher; @@ -45,7 +45,7 @@ public class ConcurrentLinkedHashCache<K extends IMeasurableMemory, V extends IM .concurrencyLevel(DEFAULT_CONCURENCY_LEVEL) .build(); - return new ConcurrentLinkedHashCache<K, V>(map); + return new ConcurrentLinkedHashCache<>(map); } public static <K extends IMeasurableMemory, V extends IMeasurableMemory> ConcurrentLinkedHashCache<K, V> create(long weightedCapacity) @@ -116,14 +116,14 @@ public class ConcurrentLinkedHashCache<K extends IMeasurableMemory, V extends IM map.remove(key); } - public Set<K> keySet() + public Iterator<K> keyIterator() { - return map.keySet(); + return map.keySet().iterator(); } - public Set<K> hotKeySet(int n) + public Iterator<K> hotKeyIterator(int n) { - return map.descendingKeySetWithLimit(n); + return map.descendingKeySetWithLimit(n).iterator(); } public boolean containsKey(K key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/ICache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/ICache.java b/src/java/org/apache/cassandra/cache/ICache.java index 22dbb16..37b55cd 100644 --- a/src/java/org/apache/cassandra/cache/ICache.java +++ b/src/java/org/apache/cassandra/cache/ICache.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.cache; -import java.util.Set; +import java.util.Iterator; /** * This is similar to the Map interface, but requires maintaining a given capacity @@ -46,9 +46,9 @@ public interface ICache<K, V> public void clear(); - public Set<K> keySet(); + public Iterator<K> keyIterator(); - public Set<K> hotKeySet(int n); + public Iterator<K> hotKeyIterator(int n); public boolean containsKey(K key); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/InstrumentingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/InstrumentingCache.java b/src/java/org/apache/cassandra/cache/InstrumentingCache.java index 311b373..c8728fd 100644 --- a/src/java/org/apache/cassandra/cache/InstrumentingCache.java +++ b/src/java/org/apache/cassandra/cache/InstrumentingCache.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.cache; -import java.util.Set; +import java.util.Iterator; import org.apache.cassandra.metrics.CacheMetrics; @@ -26,7 +26,6 @@ import org.apache.cassandra.metrics.CacheMetrics; */ public class InstrumentingCache<K, V> { - private volatile boolean capacitySetManually; private final ICache<K, V> map; private final String type; @@ -78,20 +77,9 @@ public class InstrumentingCache<K, V> return map.capacity(); } - public boolean isCapacitySetManually() - { - return capacitySetManually; - } - - public void updateCapacity(long capacity) - { - map.setCapacity(capacity); - } - public void setCapacity(long capacity) { - updateCapacity(capacity); - capacitySetManually = true; + map.setCapacity(capacity); } public int size() @@ -110,14 +98,14 @@ public class InstrumentingCache<K, V> metrics = new CacheMetrics(type, map); } - public Set<K> getKeySet() + public Iterator<K> keyIterator() { - return map.keySet(); + return map.keyIterator(); } - public Set<K> hotKeySet(int n) + public Iterator<K> hotKeyIterator(int n) { - return map.hotKeySet(n); + return map.hotKeyIterator(n); } public boolean containsKey(K key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/NopCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/NopCacheProvider.java b/src/java/org/apache/cassandra/cache/NopCacheProvider.java new file mode 100644 index 0000000..20f837a --- /dev/null +++ b/src/java/org/apache/cassandra/cache/NopCacheProvider.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + +import java.util.Collections; +import java.util.Iterator; + +public class NopCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> +{ + public ICache<RowCacheKey, IRowCacheEntry> create() + { + return new NopCache(); + } + + private static class NopCache implements ICache<RowCacheKey, IRowCacheEntry> + { + public long capacity() + { + return 0; + } + + public void setCapacity(long capacity) + { + } + + public void put(RowCacheKey key, IRowCacheEntry value) + { + } + + public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value) + { + return false; + } + + public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value) + { + return false; + } + + public IRowCacheEntry get(RowCacheKey key) + { + return null; + } + + public void remove(RowCacheKey key) + { + } + + public int size() + { + return 0; + } + + public long weightedSize() + { + return 0; + } + + public void clear() + { + } + + public Iterator<RowCacheKey> hotKeyIterator(int n) + { + return Collections.emptyIterator(); + } + + public Iterator<RowCacheKey> keyIterator() + { + return Collections.emptyIterator(); + } + + public boolean containsKey(RowCacheKey key) + { + return false; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/OHCProvider.java b/src/java/org/apache/cassandra/cache/OHCProvider.java new file mode 100644 index 0000000..365ca41 --- /dev/null +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Iterator; +import java.util.UUID; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.Memory; +import org.apache.cassandra.net.MessagingService; +import org.caffinitas.ohc.OHCache; +import org.caffinitas.ohc.OHCacheBuilder; + +public class OHCProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> +{ + public ICache<RowCacheKey, IRowCacheEntry> create() + { + OHCacheBuilder<RowCacheKey, IRowCacheEntry> builder = OHCacheBuilder.newBuilder(); + builder.capacity(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024) + .keySerializer(new KeySerializer()) + .valueSerializer(new ValueSerializer()) + .throwOOME(true); + + return new OHCacheAdapter(builder.build()); + } + + private static class OHCacheAdapter implements ICache<RowCacheKey, IRowCacheEntry> + { + private final OHCache<RowCacheKey, IRowCacheEntry> ohCache; + + public OHCacheAdapter(OHCache<RowCacheKey, IRowCacheEntry> ohCache) + { + this.ohCache = ohCache; + } + + public long capacity() + { + return ohCache.capacity(); + } + + public void setCapacity(long capacity) + { + ohCache.setCapacity(capacity); + } + + public void put(RowCacheKey key, IRowCacheEntry value) + { + ohCache.put(key, value); + } + + public boolean putIfAbsent(RowCacheKey key, IRowCacheEntry value) + { + return ohCache.putIfAbsent(key, value); + } + + public boolean replace(RowCacheKey key, IRowCacheEntry old, IRowCacheEntry value) + { + return ohCache.addOrReplace(key, old, value); + } + + public IRowCacheEntry get(RowCacheKey key) + { + return ohCache.get(key); + } + + public void remove(RowCacheKey key) + { + ohCache.remove(key); + } + + public int size() + { + return (int) ohCache.size(); + } + + public long weightedSize() + { + return ohCache.size(); + } + + public void clear() + { + ohCache.clear(); + } + + public Iterator<RowCacheKey> hotKeyIterator(int n) + { + return ohCache.hotKeyIterator(n); + } + + public Iterator<RowCacheKey> keyIterator() + { + return ohCache.keyIterator(); + } + + public boolean containsKey(RowCacheKey key) + { + return ohCache.containsKey(key); + } + } + + private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey> + { + public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException + { + dataOutput.writeLong(rowCacheKey.cfId.getMostSignificantBits()); + dataOutput.writeLong(rowCacheKey.cfId.getLeastSignificantBits()); + dataOutput.writeInt(rowCacheKey.key.length); + dataOutput.write(rowCacheKey.key); + } + + public RowCacheKey deserialize(DataInput dataInput) throws IOException + { + long msb = dataInput.readLong(); + long lsb = dataInput.readLong(); + byte[] key = new byte[dataInput.readInt()]; + dataInput.readFully(key); + return new RowCacheKey(new UUID(msb, lsb), key); + } + + public int serializedSize(RowCacheKey rowCacheKey) + { + return 20 + rowCacheKey.key.length; + } + } + + private static class ValueSerializer implements org.caffinitas.ohc.CacheSerializer<IRowCacheEntry> + { + public void serialize(IRowCacheEntry entry, DataOutput out) throws IOException + { + assert entry != null; // unlike CFS we don't support nulls, since there is no need for that in the cache + boolean isSentinel = entry instanceof RowCacheSentinel; + out.writeBoolean(isSentinel); + if (isSentinel) + out.writeLong(((RowCacheSentinel) entry).sentinelId); + else + ColumnFamily.serializer.serialize((ColumnFamily) entry, new DataOutputPlusAdapter(out), MessagingService.current_version); + } + + public IRowCacheEntry deserialize(DataInput in) throws IOException + { + boolean isSentinel = in.readBoolean(); + if (isSentinel) + return new RowCacheSentinel(in.readLong()); + return ColumnFamily.serializer.deserialize(in, MessagingService.current_version); + } + + public int serializedSize(IRowCacheEntry entry) + { + TypeSizes typeSizes = TypeSizes.NATIVE; + int size = typeSizes.sizeof(true); + if (entry instanceof RowCacheSentinel) + size += typeSizes.sizeof(((RowCacheSentinel) entry).sentinelId); + else + size += ColumnFamily.serializer.serializedSize((ColumnFamily) entry, typeSizes, MessagingService.current_version); + return size; + } + } + + static class DataOutputPlusAdapter implements DataOutputPlus + { + private final DataOutput out; + + public void write(byte[] b) throws IOException + { + out.write(b); + } + + public void write(byte[] b, int off, int len) throws IOException + { + out.write(b, off, len); + } + + public void write(int b) throws IOException + { + out.write(b); + } + + public void writeBoolean(boolean v) throws IOException + { + out.writeBoolean(v); + } + + public void writeByte(int v) throws IOException + { + out.writeByte(v); + } + + public void writeBytes(String s) throws IOException + { + out.writeBytes(s); + } + + public void writeChar(int v) throws IOException + { + out.writeChar(v); + } + + public void writeChars(String s) throws IOException + { + out.writeChars(s); + } + + public void writeDouble(double v) throws IOException + { + out.writeDouble(v); + } + + public void writeFloat(float v) throws IOException + { + out.writeFloat(v); + } + + public void writeInt(int v) throws IOException + { + out.writeInt(v); + } + + public void writeLong(long v) throws IOException + { + out.writeLong(v); + } + + public void writeShort(int v) throws IOException + { + out.writeShort(v); + } + + public void writeUTF(String s) throws IOException + { + out.writeUTF(s); + } + + public DataOutputPlusAdapter(DataOutput out) + { + this.out = out; + } + + public void write(ByteBuffer buffer) throws IOException + { + if (buffer.hasArray()) + out.write(buffer.array(), buffer.arrayOffset(), buffer.remaining()); + else + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + + public void write(Memory memory) throws IOException + { + throw new UnsupportedOperationException("IMPLEMENT ME"); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/RowCacheKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/RowCacheKey.java b/src/java/org/apache/cassandra/cache/RowCacheKey.java index af2d4d4..ccb85d8 100644 --- a/src/java/org/apache/cassandra/cache/RowCacheKey.java +++ b/src/java/org/apache/cassandra/cache/RowCacheKey.java @@ -33,6 +33,12 @@ public class RowCacheKey implements CacheKey, Comparable<RowCacheKey> private static final long EMPTY_SIZE = ObjectSizes.measure(new RowCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER)); + public RowCacheKey(UUID cfId, byte[] key) + { + this.cfId = cfId; + this.key = key; + } + public RowCacheKey(UUID cfId, DecoratedKey key) { this(cfId, key.getKey()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/SerializingCache.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCache.java b/src/java/org/apache/cassandra/cache/SerializingCache.java index ca65fcc..911b500 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCache.java +++ b/src/java/org/apache/cassandra/cache/SerializingCache.java @@ -18,7 +18,7 @@ package org.apache.cassandra.cache; import java.io.IOException; -import java.util.Set; +import java.util.Iterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -264,14 +264,14 @@ public class SerializingCache<K, V> implements ICache<K, V> mem.unreference(); } - public Set<K> keySet() + public Iterator<K> keyIterator() { - return map.keySet(); + return map.keySet().iterator(); } - public Set<K> hotKeySet(int n) + public Iterator<K> hotKeyIterator(int n) { - return map.descendingKeySetWithLimit(n); + return map.descendingKeySetWithLimit(n).iterator(); } public boolean containsKey(K key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java index a058872..f540322 100644 --- a/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java +++ b/src/java/org/apache/cassandra/cache/SerializingCacheProvider.java @@ -20,17 +20,18 @@ package org.apache.cassandra.cache; import java.io.DataInput; import java.io.IOException; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.io.ISerializer; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.net.MessagingService; -public class SerializingCacheProvider +public class SerializingCacheProvider implements CacheProvider<RowCacheKey, IRowCacheEntry> { - public ICache<RowCacheKey, IRowCacheEntry> create(long capacity) + public ICache<RowCacheKey, IRowCacheEntry> create() { - return SerializingCache.create(capacity, new RowCacheSerializer()); + return SerializingCache.create(DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024, new RowCacheSerializer()); } // Package protected for tests http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/Config.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/Config.java b/src/java/org/apache/cassandra/config/Config.java index 33d2bb2..f42c980 100644 --- a/src/java/org/apache/cassandra/config/Config.java +++ b/src/java/org/apache/cassandra/config/Config.java @@ -192,6 +192,7 @@ public class Config public volatile int key_cache_save_period = 14400; public volatile int key_cache_keys_to_save = Integer.MAX_VALUE; + public String row_cache_class_name = "org.apache.cassandra.cache.OHCProvider"; public long row_cache_size_in_mb = 0; public volatile int row_cache_save_period = 0; public volatile int row_cache_keys_to_save = Integer.MAX_VALUE; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 6d626da..8cc2da4 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1423,6 +1423,11 @@ public class DatabaseDescriptor conf.key_cache_keys_to_save = keyCacheKeysToSave; } + public static String getRowCacheClassName() + { + return conf.row_cache_class_name; + } + public static long getRowCacheSizeInMB() { return conf.row_cache_size_in_mb; @@ -1448,6 +1453,11 @@ public class DatabaseDescriptor return counterCacheSizeInMB; } + public static void setRowCacheKeysToSave(int rowCacheKeysToSave) + { + conf.row_cache_keys_to_save = rowCacheKeysToSave; + } + public static int getCounterCacheSavePeriod() { return conf.counter_cache_save_period; @@ -1473,11 +1483,6 @@ public class DatabaseDescriptor return memoryAllocator; } - public static void setRowCacheKeysToSave(int rowCacheKeysToSave) - { - conf.row_cache_keys_to_save = rowCacheKeysToSave; - } - public static int getStreamingSocketTimeout() { return conf.streaming_socket_timeout_in_ms; http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java index 50991f2..82c8151 100644 --- a/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java +++ b/src/java/org/apache/cassandra/config/YamlConfigurationLoader.java @@ -102,7 +102,7 @@ public class YamlConfigurationLoader implements ConfigurationLoader } logConfig(configBytes); - + org.yaml.snakeyaml.constructor.Constructor constructor = new org.yaml.snakeyaml.constructor.Constructor(Config.class); TypeDescription seedDesc = new TypeDescription(SeedProviderDef.class); seedDesc.putMapPropertyType("parameters", String.class, String.class); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 002238c..c2ee0ac 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1961,19 +1961,23 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName()); - for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + for (Iterator<RowCacheKey> keyIter = CacheService.instance.rowCache.keyIterator(); + keyIter.hasNext(); ) { + RowCacheKey key = keyIter.next(); DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); - if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges)) + if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) invalidateCachedRow(dk); } if (metadata.isCounter()) { - for (CounterCacheKey key : CacheService.instance.counterCache.getKeySet()) + for (Iterator<CounterCacheKey> keyIter = CacheService.instance.counterCache.keyIterator(); + keyIter.hasNext(); ) { + CounterCacheKey key = keyIter.next(); DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); - if (key.cfId == metadata.cfId && !Range.isInRanges(dk.getToken(), ranges)) + if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/src/java/org/apache/cassandra/service/CacheService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CacheService.java b/src/java/org/apache/cassandra/service/CacheService.java index 2ffd954..fb8153c 100644 --- a/src/java/org/apache/cassandra/service/CacheService.java +++ b/src/java/org/apache/cassandra/service/CacheService.java @@ -33,6 +33,7 @@ import javax.management.MBeanServer; import javax.management.ObjectName; import com.google.common.util.concurrent.Futures; + import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -132,10 +133,22 @@ public class CacheService implements CacheServiceMBean { logger.info("Initializing row cache with capacity of {} MBs", DatabaseDescriptor.getRowCacheSizeInMB()); - long rowCacheInMemoryCapacity = DatabaseDescriptor.getRowCacheSizeInMB() * 1024 * 1024; + CacheProvider<RowCacheKey, IRowCacheEntry> cacheProvider; + String cacheProviderClassName = DatabaseDescriptor.getRowCacheSizeInMB() > 0 + ? DatabaseDescriptor.getRowCacheClassName() : "org.apache.cassandra.cache.NopCacheProvider"; + try + { + Class<CacheProvider<RowCacheKey, IRowCacheEntry>> cacheProviderClass = + (Class<CacheProvider<RowCacheKey, IRowCacheEntry>>) Class.forName(cacheProviderClassName); + cacheProvider = cacheProviderClass.newInstance(); + } + catch (Exception e) + { + throw new RuntimeException("Cannot find configured row cache provider class " + DatabaseDescriptor.getRowCacheClassName()); + } // cache object - ICache<RowCacheKey, IRowCacheEntry> rc = new SerializingCacheProvider().create(rowCacheInMemoryCapacity); + ICache<RowCacheKey, IRowCacheEntry> rc = cacheProvider.create(); AutoSavingCache<RowCacheKey, IRowCacheEntry> rowCache = new AutoSavingCache<>(rc, CacheType.ROW_CACHE, new RowCacheSerializer()); int rowCacheKeysToSave = DatabaseDescriptor.getRowCacheKeysToSave(); @@ -285,7 +298,7 @@ public class CacheService implements CacheServiceMBean public void invalidateKeyCacheForCf(UUID cfId) { - Iterator<KeyCacheKey> keyCacheIterator = keyCache.getKeySet().iterator(); + Iterator<KeyCacheKey> keyCacheIterator = keyCache.keyIterator(); while (keyCacheIterator.hasNext()) { KeyCacheKey key = keyCacheIterator.next(); @@ -301,7 +314,7 @@ public class CacheService implements CacheServiceMBean public void invalidateRowCacheForCf(UUID cfId) { - Iterator<RowCacheKey> rowCacheIterator = rowCache.getKeySet().iterator(); + Iterator<RowCacheKey> rowCacheIterator = rowCache.keyIterator(); while (rowCacheIterator.hasNext()) { RowCacheKey rowCacheKey = rowCacheIterator.next(); @@ -312,7 +325,7 @@ public class CacheService implements CacheServiceMBean public void invalidateCounterCacheForCf(UUID cfId) { - Iterator<CounterCacheKey> counterCacheIterator = counterCache.getKeySet().iterator(); + Iterator<CounterCacheKey> counterCacheIterator = counterCache.keyIterator(); while (counterCacheIterator.hasNext()) { CounterCacheKey counterCacheKey = counterCacheIterator.next(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/conf/cassandra.yaml ---------------------------------------------------------------------- diff --git a/test/conf/cassandra.yaml b/test/conf/cassandra.yaml index ec988e2..307ca8c 100644 --- a/test/conf/cassandra.yaml +++ b/test/conf/cassandra.yaml @@ -36,3 +36,5 @@ server_encryption_options: incremental_backups: true concurrent_compactors: 4 compaction_throughput_mb_per_sec: 0 +row_cache_class_name: org.apache.cassandra.cache.OHCProvider +row_cache_size_in_mb: 16 http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/unit/org/apache/cassandra/db/KeyCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/KeyCacheTest.java b/test/unit/org/apache/cassandra/db/KeyCacheTest.java index e31b439..c370e4f 100644 --- a/test/unit/org/apache/cassandra/db/KeyCacheTest.java +++ b/test/unit/org/apache/cassandra/db/KeyCacheTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -88,8 +89,10 @@ public class KeyCacheTest // really? our caches don't implement the map interface? (hence no .addAll) Map<KeyCacheKey, RowIndexEntry> savedMap = new HashMap<KeyCacheKey, RowIndexEntry>(); - for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet()) + for (Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator(); + iter.hasNext();) { + KeyCacheKey k = iter.next(); if (k.desc.ksname.equals(KEYSPACE1) && k.desc.cfname.equals(COLUMN_FAMILY2)) savedMap.put(k, CacheService.instance.keyCache.get(k)); } @@ -207,8 +210,10 @@ public class KeyCacheTest private void assertKeyCacheSize(int expected, String keyspace, String columnFamily) { int size = 0; - for (KeyCacheKey k : CacheService.instance.keyCache.getKeySet()) + for (Iterator<KeyCacheKey> iter = CacheService.instance.keyCache.keyIterator(); + iter.hasNext();) { + KeyCacheKey k = iter.next(); if (k.desc.ksname.equals(keyspace) && k.desc.cfname.equals(columnFamily)) size++; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/dfd78d0e/test/unit/org/apache/cassandra/db/RowCacheTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/RowCacheTest.java b/test/unit/org/apache/cassandra/db/RowCacheTest.java index 7d5799a..3d5617f 100644 --- a/test/unit/org/apache/cassandra/db/RowCacheTest.java +++ b/test/unit/org/apache/cassandra/db/RowCacheTest.java @@ -156,9 +156,9 @@ public class RowCacheTest rowCacheLoad(100, Integer.MAX_VALUE, 1000); ColumnFamilyStore store = Keyspace.open(KEYSPACE_CACHED).getColumnFamilyStore(CF_CACHED); - assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100); + assertEquals(CacheService.instance.rowCache.size(), 100); store.cleanupCache(); - assertEquals(CacheService.instance.rowCache.getKeySet().size(), 100); + assertEquals(CacheService.instance.rowCache.size(), 100); TokenMetadata tmd = StorageService.instance.getTokenMetadata(); byte[] tk1, tk2; tk1 = "key1000".getBytes(); @@ -166,7 +166,7 @@ public class RowCacheTest tmd.updateNormalToken(new BytesToken(tk1), InetAddress.getByName("127.0.0.1")); tmd.updateNormalToken(new BytesToken(tk2), InetAddress.getByName("127.0.0.2")); store.cleanupCache(); - assertEquals(CacheService.instance.rowCache.getKeySet().size(), 50); + assertEquals(50, CacheService.instance.rowCache.size()); CacheService.instance.setRowCacheCapacityInMB(0); } @@ -259,19 +259,19 @@ public class RowCacheTest // empty the cache CacheService.instance.invalidateRowCache(); - assert CacheService.instance.rowCache.size() == 0; + assertEquals(0, CacheService.instance.rowCache.size()); // insert data and fill the cache SchemaLoader.insertData(KEYSPACE_CACHED, CF_CACHED, offset, totalKeys); SchemaLoader.readData(KEYSPACE_CACHED, CF_CACHED, offset, totalKeys); - assert CacheService.instance.rowCache.size() == totalKeys; + assertEquals(totalKeys, CacheService.instance.rowCache.size()); // force the cache to disk CacheService.instance.rowCache.submitWrite(keysToSave).get(); // empty the cache again to make sure values came from disk CacheService.instance.invalidateRowCache(); - assert CacheService.instance.rowCache.size() == 0; - assert CacheService.instance.rowCache.loadSaved(store) == (keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave); + assertEquals(0, CacheService.instance.rowCache.size()); + assertEquals(keysToSave == Integer.MAX_VALUE ? totalKeys : keysToSave, CacheService.instance.rowCache.loadSaved(store)); } }
