Repository: cassandra Updated Branches: refs/heads/trunk a03ac8d07 -> 346d8fb6d
Update syntax for changing caching options. Patch by marcuse, reviewed by slebresne for CASSANDRA-6745. Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/da444a69 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/da444a69 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/da444a69 Branch: refs/heads/trunk Commit: da444a6903ddd8d02e7b251c8b38faf11d3a5c93 Parents: ad5169d Author: Marcus Eriksson <marc...@apache.org> Authored: Wed Mar 12 14:04:47 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Wed Mar 12 14:06:05 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + pylib/cqlshlib/cql3handling.py | 13 +- .../apache/cassandra/cache/CachingOptions.java | 288 +++++++++++++++++++ .../org/apache/cassandra/cli/CliClient.java | 3 +- .../org/apache/cassandra/config/CFMetaData.java | 110 ++----- .../cassandra/cql/AlterTableStatement.java | 4 +- .../cql/CreateColumnFamilyStatement.java | 4 +- .../cassandra/cql3/statements/CFPropDefs.java | 25 +- .../apache/cassandra/db/ColumnFamilyStore.java | 18 +- .../org/apache/cassandra/db/SystemKeyspace.java | 33 ++- .../cassandra/io/sstable/SSTableReader.java | 6 +- .../unit/org/apache/cassandra/SchemaLoader.java | 14 +- 12 files changed, 391 insertions(+), 128 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index d8eb3a1..61e17e3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -11,6 +11,7 @@ * Improve handling of range tombstone for wide partitions (CASSANDRA-6446) * Fix ClassCastException for compact table with composites (CASSANDRA-6738) * Fix potentially repairing with wrong nodes (CASSANDRA-6808) + * Change caching option syntax (CASSANDRA-6745) Merged from 2.0: * Fix saving triggers to schema (CASSANDRA-6789) * Fix trigger mutations when base mutation list is immutable (CASSANDRA-6790) http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/pylib/cqlshlib/cql3handling.py ---------------------------------------------------------------------- diff --git a/pylib/cqlshlib/cql3handling.py b/pylib/cqlshlib/cql3handling.py index 158e60d..3522d1c 100644 --- a/pylib/cqlshlib/cql3handling.py +++ b/pylib/cqlshlib/cql3handling.py @@ -62,8 +62,6 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): columnfamily_layout_options = ( ('bloom_filter_fp_chance', None), - ('caching', None), - ('rows_per_partition_to_cache', None), ('comment', None), ('dclocal_read_repair_chance', 'local_read_repair_chance'), ('gc_grace_seconds', None), @@ -83,6 +81,8 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet): ('class', 'min_threshold', 'max_threshold')), ('compression', 'compression_parameters', ('sstable_compression', 'chunk_length_kb', 'crc_check_chance')), + ('caching', None, + ('rows_per_partition', 'keys')), ) obsolete_cf_options = () @@ -463,6 +463,8 @@ def cf_prop_val_mapkey_completer(ctxt, cass): pairsseen = dict(zip(keysseen, valsseen)) if optname == 'compression': return map(escape_value, set(subopts).difference(keysseen)) + if optname == 'caching': + return map(escape_value, set(subopts).difference(keysseen)) if optname == 'compaction': opts = set(subopts) try: @@ -488,6 +490,11 @@ def cf_prop_val_mapval_completer(ctxt, cass): if key == 'sstable_compression': return map(escape_value, CqlRuleSet.available_compression_classes) return [Hint('<option_value>')] + elif opt == 'caching': + if key == 'rows_per_partition': + return [Hint('ALL'), Hint('NONE'), Hint('#rows_per_partition')] + elif key == 'keys': + return [Hint('ALL'), Hint('NONE')] return () def cf_prop_val_mapender_completer(ctxt, cass): @@ -1187,7 +1194,7 @@ class CqlTableDef: for attr, val in layout.items(): setattr(cf, attr.encode('ascii'), val) cf.comparator = lookup_casstype(cf.comparator) - for attr in ('compaction_strategy_options', 'compression_parameters'): + for attr in ('compaction_strategy_options', 'compression_parameters', 'caching'): setattr(cf, attr, json.loads(getattr(cf, attr))) # deal with columns, filter out empty column names (see CASSANDRA-6139) http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/cache/CachingOptions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cache/CachingOptions.java b/src/java/org/apache/cassandra/cache/CachingOptions.java new file mode 100644 index 0000000..6eeaa37 --- /dev/null +++ b/src/java/org/apache/cassandra/cache/CachingOptions.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.cache; + + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.commons.lang3.StringUtils; + +import org.apache.cassandra.exceptions.ConfigurationException; +import static org.apache.cassandra.utils.FBUtilities.fromJsonMap; + +/* +CQL: { 'keys' : 'ALL|NONE', 'rows_per_partition': '200|NONE|ALL' } + */ +public class CachingOptions +{ + public static final CachingOptions KEYS_ONLY = new CachingOptions(new KeyCache(KeyCache.Type.ALL), new RowCache(RowCache.Type.NONE)); + public static final CachingOptions ALL = new CachingOptions(new KeyCache(KeyCache.Type.ALL), new RowCache(RowCache.Type.ALL)); + public static final CachingOptions ROWS_ONLY = new CachingOptions(new KeyCache(KeyCache.Type.NONE), new RowCache(RowCache.Type.ALL)); + public static final CachingOptions NONE = new CachingOptions(new KeyCache(KeyCache.Type.NONE), new RowCache(RowCache.Type.NONE)); + + public final KeyCache keyCache; + public final RowCache rowCache; + private static final Set<String> legacyOptions = new HashSet<>(Arrays.asList("ALL", "NONE", "KEYS_ONLY", "ROWS_ONLY")); + + public CachingOptions(KeyCache kc, RowCache rc) + { + this.keyCache = kc; + this.rowCache = rc; + } + + public static CachingOptions fromString(String cache) throws ConfigurationException + { + if (legacyOptions.contains(cache.toUpperCase())) + return fromLegacyOption(cache.toUpperCase()); + return fromMap(fromJsonMap(cache)); + } + + public static CachingOptions fromMap(Map<String, String> cacheConfig) throws ConfigurationException + { + validateCacheConfig(cacheConfig); + if (!cacheConfig.containsKey("keys") && !cacheConfig.containsKey("rows_per_partition")) + return CachingOptions.NONE; + if (!cacheConfig.containsKey("keys")) + return new CachingOptions(new KeyCache(KeyCache.Type.NONE), RowCache.fromString(cacheConfig.get("rows_per_partition"))); + if (!cacheConfig.containsKey("rows_per_partition")) + return CachingOptions.KEYS_ONLY; + + return new CachingOptions(KeyCache.fromString(cacheConfig.get("keys")), RowCache.fromString(cacheConfig.get("rows_per_partition"))); + } + + private static void validateCacheConfig(Map<String, String> cacheConfig) throws ConfigurationException + { + for (Map.Entry<String, String> entry : cacheConfig.entrySet()) + { + String value = entry.getValue().toUpperCase(); + if (entry.getKey().equals("keys")) + { + if (!(value.equals("ALL") || value.equals("NONE"))) + { + throw new ConfigurationException("'keys' can only have values 'ALL' or 'NONE'"); + } + } + else if (entry.getKey().equals("rows_per_partition")) + { + if (!(value.equals("ALL") || value.equals("NONE") || StringUtils.isNumeric(value))) + { + throw new ConfigurationException("'rows_per_partition' can only have values 'ALL', 'NONE' or be numeric."); + } + } + else + throw new ConfigurationException("Only supported CachingOptions parameters are 'keys' and 'rows_per_partition'"); + } + } + + @Override + public String toString() + { + return String.format("{\"keys\":\"%s\", \"rows_per_partition\":\"%s\"}", keyCache.toString(), rowCache.toString()); + } + + private static CachingOptions fromLegacyOption(String cache) + { + if (cache.equals("ALL")) + return ALL; + if (cache.equals("KEYS_ONLY")) + return KEYS_ONLY; + if (cache.equals("ROWS_ONLY")) + return ROWS_ONLY; + return NONE; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + CachingOptions o2 = (CachingOptions) o; + + if (!keyCache.equals(o2.keyCache)) return false; + if (!rowCache.equals(o2.rowCache)) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = keyCache.hashCode(); + result = 31 * result + rowCache.hashCode(); + return result; + } + + public static boolean isLegacy(String CachingOptions) + { + return legacyOptions.contains(CachingOptions.toUpperCase()); + } + + public static CachingOptions fromThrift(String caching, String cellsPerRow) throws ConfigurationException + { + + RowCache rc = new RowCache(RowCache.Type.NONE); + KeyCache kc = new KeyCache(KeyCache.Type.ALL); + // if we get a caching string from thrift it is legacy, "ALL", "KEYS_ONLY" etc, fromString handles those + if (caching != null) + { + CachingOptions givenOptions = CachingOptions.fromString(caching); + rc = givenOptions.rowCache; + kc = givenOptions.keyCache; + } + // if we get cells_per_row from thrift, it is either "ALL" or "<number of cells to cache>". + if (cellsPerRow != null && rc.isEnabled()) + rc = RowCache.fromString(cellsPerRow); + return new CachingOptions(kc, rc); + } + + public String toThriftCaching() + { + if (rowCache.isEnabled() && keyCache.isEnabled()) + return "ALL"; + if (rowCache.isEnabled()) + return "ROWS_ONLY"; + if (keyCache.isEnabled()) + return "KEYS_ONLY"; + return "NONE"; + } + + public String toThriftCellsPerRow() + { + if (rowCache.cacheFullPartitions()) + return "ALL"; + return String.valueOf(rowCache.rowsToCache); + } + + + public static class KeyCache + { + public final Type type; + public KeyCache(Type type) + { + this.type = type; + } + + public enum Type + { + ALL, NONE + } + public static KeyCache fromString(String keyCache) + { + return new KeyCache(Type.valueOf(keyCache.toUpperCase())); + } + + public boolean isEnabled() + { + return type.equals(Type.ALL); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + KeyCache keyCache = (KeyCache) o; + + if (type != keyCache.type) return false; + + return true; + } + + @Override + public int hashCode() + { + return type.hashCode(); + } + @Override + public String toString() + { + return type.toString(); + } + } + + public static class RowCache + { + public final Type type; + public final int rowsToCache; + + public RowCache(Type type) + { + this(type, type.equals(Type.ALL) ? Integer.MAX_VALUE : 0); + } + public RowCache(Type type, int rowsToCache) + { + this.type = type; + this.rowsToCache = rowsToCache; + } + + public enum Type + { + ALL, NONE, HEAD + } + + public static RowCache fromString(String rowCache) + { + if (rowCache == null || rowCache.equalsIgnoreCase("none")) + return new RowCache(Type.NONE, 0); + else if (rowCache.equalsIgnoreCase("all")) + return new RowCache(Type.ALL, Integer.MAX_VALUE); + return new RowCache(Type.HEAD, Integer.parseInt(rowCache)); + } + public boolean isEnabled() + { + return type.equals(Type.ALL) || type.equals(Type.HEAD); + } + public boolean cacheFullPartitions() + { + return type.equals(Type.ALL); + } + @Override + public String toString() + { + if (type.equals(Type.ALL)) return "ALL"; + if (type.equals(Type.NONE)) return "NONE"; + return String.valueOf(rowsToCache); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + RowCache rowCache = (RowCache) o; + + if (rowsToCache != rowCache.rowsToCache) return false; + if (type != rowCache.type) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = type.hashCode(); + result = 31 * result + rowsToCache; + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/cli/CliClient.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cli/CliClient.java b/src/java/org/apache/cassandra/cli/CliClient.java index b30bee1..ab6a221 100644 --- a/src/java/org/apache/cassandra/cli/CliClient.java +++ b/src/java/org/apache/cassandra/cli/CliClient.java @@ -1350,7 +1350,7 @@ public class CliClient cfDef.setCaching(CliUtils.unescapeSQLString(mValue)); break; case CELLS_PER_ROW_TO_CACHE: - cfDef.setCells_per_row_to_cache(mValue); + cfDef.setCells_per_row_to_cache(CliUtils.unescapeSQLString(mValue)); break; case DEFAULT_TIME_TO_LIVE: cfDef.setDefault_time_to_live(Integer.parseInt(mValue)); @@ -1853,7 +1853,6 @@ public class CliClient writeAttrRaw(output, false, "compaction_strategy_options", cOptions.toString()); } - if (!StringUtils.isEmpty(cfDef.comment)) writeAttr(output, false, "comment", cfDef.comment); http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index ac5dea7..f38dd5e 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -30,9 +30,11 @@ import com.google.common.collect.AbstractIterator; import com.google.common.collect.Iterables; import com.google.common.collect.MapDifference; import com.google.common.collect.Maps; + +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.db.composites.*; + import org.apache.commons.lang3.ArrayUtils; -import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; import org.slf4j.Logger; @@ -80,13 +82,12 @@ public final class CFMetaData public final static int DEFAULT_MIN_COMPACTION_THRESHOLD = 4; public final static int DEFAULT_MAX_COMPACTION_THRESHOLD = 32; public final static Class<? extends AbstractCompactionStrategy> DEFAULT_COMPACTION_STRATEGY_CLASS = SizeTieredCompactionStrategy.class; - public final static Caching DEFAULT_CACHING_STRATEGY = Caching.KEYS_ONLY; + public final static CachingOptions DEFAULT_CACHING_STRATEGY = CachingOptions.KEYS_ONLY; public final static int DEFAULT_DEFAULT_TIME_TO_LIVE = 0; public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99); public final static int DEFAULT_MIN_INDEX_INTERVAL = 128; public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048; public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false; - public final static RowsPerPartitionToCache DEFAULT_ROWS_PER_PARTITION_TO_CACHE = new RowsPerPartitionToCache(100, RowsPerPartitionToCache.Type.HEAD); // Note that this is the default only for user created tables public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName(); @@ -300,65 +301,6 @@ public final class CFMetaData + "PRIMARY KEY (id)" + ") WITH COMMENT='show all compaction history' AND DEFAULT_TIME_TO_LIVE=604800"); - public enum Caching - { - ALL, KEYS_ONLY, ROWS_ONLY, NONE; - - public static Caching fromString(String cache) throws ConfigurationException - { - try - { - return valueOf(cache.toUpperCase()); - } - catch (IllegalArgumentException e) - { - throw new ConfigurationException(String.format("%s not found, available types: %s.", cache, StringUtils.join(values(), ", "))); - } - } - } - - public static class RowsPerPartitionToCache - { - public enum Type - { - ALL, HEAD - } - public final int rowsToCache; - public final Type type; - - private RowsPerPartitionToCache(int rowsToCache, Type type) - { - this.rowsToCache = rowsToCache; - this.type = type; - } - - public static RowsPerPartitionToCache fromString(String rpptc) - { - if (rpptc.equalsIgnoreCase("all")) - return new RowsPerPartitionToCache(Integer.MAX_VALUE, Type.ALL); - return new RowsPerPartitionToCache(Integer.parseInt(rpptc), Type.HEAD); - } - - public boolean cacheFullPartitions() - { - return type == Type.ALL; - } - - public String toString() - { - if (rowsToCache == Integer.MAX_VALUE) - return "ALL"; - return String.valueOf(rowsToCache); - } - - public boolean equals(Object rhs) - { - if (!(rhs instanceof RowsPerPartitionToCache)) - return false; - RowsPerPartitionToCache rppc = (RowsPerPartitionToCache)rhs; - return rowsToCache == rppc.rowsToCache && type == rppc.type; - } - } public static class SpeculativeRetry { @@ -453,7 +395,7 @@ public final class CFMetaData private volatile int minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD; private volatile int maxCompactionThreshold = DEFAULT_MAX_COMPACTION_THRESHOLD; private volatile Double bloomFilterFpChance = null; - private volatile Caching caching = DEFAULT_CACHING_STRATEGY; + private volatile CachingOptions caching = DEFAULT_CACHING_STRATEGY; private volatile int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL; private volatile int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL; private int memtableFlushPeriod = 0; @@ -463,7 +405,6 @@ public final class CFMetaData private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>(); private volatile Map<String, TriggerDefinition> triggers = new HashMap<>(); private volatile boolean isPurged = false; - private volatile RowsPerPartitionToCache rowsPerPartitionToCache = DEFAULT_ROWS_PER_PARTITION_TO_CACHE; /* * All CQL3 columns definition are stored in the columnMetadata map. * On top of that, we keep separated collection of each kind of definition, to @@ -500,7 +441,7 @@ public final class CFMetaData public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;} public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;} public CFMetaData bloomFilterFpChance(Double prop) {bloomFilterFpChance = prop; return this;} - public CFMetaData caching(Caching prop) {caching = prop; return this;} + public CFMetaData caching(CachingOptions prop) {caching = prop; return this;} public CFMetaData minIndexInterval(int prop) {minIndexInterval = prop; return this;} public CFMetaData maxIndexInterval(int prop) {maxIndexInterval = prop; return this;} public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;} @@ -509,7 +450,6 @@ public final class CFMetaData public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;} public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;} public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;} - public CFMetaData rowsPerPartitionToCache(RowsPerPartitionToCache prop) { rowsPerPartitionToCache = prop; return this; } /** * Create new ColumnFamily metadata with generated random ID. @@ -626,9 +566,9 @@ public final class CFMetaData { // Depends on parent's cache setting, turn on its index CF's cache. // Row caching is never enabled; see CASSANDRA-5732 - Caching indexCaching = parent.getCaching() == Caching.ALL || parent.getCaching() == Caching.KEYS_ONLY - ? Caching.KEYS_ONLY - : Caching.NONE; + CachingOptions indexCaching = parent.getCaching().keyCache.isEnabled() + ? CachingOptions.KEYS_ONLY + : CachingOptions.NONE; return new CFMetaData(parent.ksName, parent.indexColumnFamilyName(info), ColumnFamilyType.Standard, indexComparator, parent.cfId) .keyValidator(info.type) @@ -697,7 +637,6 @@ public final class CFMetaData .populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush) .droppedColumns(new HashMap<>(oldCFMD.droppedColumns)) .triggers(new HashMap<>(oldCFMD.triggers)) - .rowsPerPartitionToCache(oldCFMD.rowsPerPartitionToCache) .rebuild(); } @@ -884,7 +823,7 @@ public final class CFMetaData : bloomFilterFpChance; } - public Caching getCaching() + public CachingOptions getCaching() { return caching; } @@ -899,11 +838,6 @@ public final class CFMetaData return maxIndexInterval; } - public RowsPerPartitionToCache getRowsPerPartitionToCache() - { - return rowsPerPartitionToCache; - } - public SpeculativeRetry getSpeculativeRetry() { return speculativeRetry; @@ -961,8 +895,7 @@ public final class CFMetaData && Objects.equal(speculativeRetry, other.speculativeRetry) && Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush) && Objects.equal(droppedColumns, other.droppedColumns) - && Objects.equal(triggers, other.triggers) - && Objects.equal(rowsPerPartitionToCache, other.rowsPerPartitionToCache); + && Objects.equal(triggers, other.triggers); } @Override @@ -996,7 +929,6 @@ public final class CFMetaData .append(populateIoCacheOnFlush) .append(droppedColumns) .append(triggers) - .append(rowsPerPartitionToCache) .toHashCode(); } @@ -1067,9 +999,6 @@ public final class CFMetaData // ensure the max is at least as large as the min cf_def.setMax_index_interval(Math.max(cf_def.min_index_interval, CFMetaData.DEFAULT_MAX_INDEX_INTERVAL)); } - - if (!cf_def.isSetCells_per_row_to_cache()) - cf_def.setCells_per_row_to_cache(CFMetaData.DEFAULT_ROWS_PER_PARTITION_TO_CACHE.toString()); } public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) throws InvalidRequestException, ConfigurationException @@ -1112,8 +1041,8 @@ public final class CFMetaData newCFMD.bloomFilterFpChance(cf_def.bloom_filter_fp_chance); if (cf_def.isSetMemtable_flush_period_in_ms()) newCFMD.memtableFlushPeriod(cf_def.memtable_flush_period_in_ms); - if (cf_def.isSetCaching()) - newCFMD.caching(Caching.fromString(cf_def.caching)); + if (cf_def.isSetCaching() || cf_def.isSetCells_per_row_to_cache()) + newCFMD.caching(CachingOptions.fromThrift(cf_def.caching, cf_def.cells_per_row_to_cache)); if (cf_def.isSetRead_repair_chance()) newCFMD.readRepairChance(cf_def.read_repair_chance); if (cf_def.isSetDefault_time_to_live()) @@ -1130,8 +1059,6 @@ public final class CFMetaData newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush); if (cf_def.isSetTriggers()) newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers)); - if (cf_def.isSetCells_per_row_to_cache()) - newCFMD.rowsPerPartitionToCache(RowsPerPartitionToCache.fromString(cf_def.cells_per_row_to_cache)); CompressionParameters cp = CompressionParameters.create(cf_def.compression_options); @@ -1229,7 +1156,6 @@ public final class CFMetaData minIndexInterval = cfm.minIndexInterval; maxIndexInterval = cfm.maxIndexInterval; memtableFlushPeriod = cfm.memtableFlushPeriod; - rowsPerPartitionToCache = cfm.rowsPerPartitionToCache; defaultTimeToLive = cfm.defaultTimeToLive; speculativeRetry = cfm.speculativeRetry; populateIoCacheOnFlush = cfm.populateIoCacheOnFlush; @@ -1378,8 +1304,8 @@ public final class CFMetaData def.setMin_index_interval(minIndexInterval); def.setMax_index_interval(maxIndexInterval); def.setMemtable_flush_period_in_ms(memtableFlushPeriod); - def.setCaching(caching.toString()); - def.setCells_per_row_to_cache(rowsPerPartitionToCache.toString()); + def.setCaching(caching.toThriftCaching()); + def.setCells_per_row_to_cache(caching.toThriftCellsPerRow()); def.setDefault_time_to_live(defaultTimeToLive); def.setSpeculative_retry(speculativeRetry.toString()); def.setTriggers(TriggerDefinition.toThrift(triggers)); @@ -1748,7 +1674,6 @@ public final class CFMetaData adder.add("memtable_flush_period_in_ms", memtableFlushPeriod); adder.add("caching", caching.toString()); - adder.add("rows_per_partition_to_cache", rowsPerPartitionToCache.toString()); adder.add("default_time_to_live", defaultTimeToLive); adder.add("compaction_strategy_class", compactionStrategyClass.getName()); adder.add("compression_parameters", json(compressionParameters.asThriftOptions())); @@ -1815,9 +1740,7 @@ public final class CFMetaData cfm.bloomFilterFpChance(result.getDouble("bloom_filter_fp_chance")); if (result.has("memtable_flush_period_in_ms")) cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms")); - cfm.caching(Caching.valueOf(result.getString("caching"))); - if (result.has("rows_per_partition_to_cache")) - cfm.rowsPerPartitionToCache(RowsPerPartitionToCache.fromString(result.getString("rows_per_partition_to_cache"))); + cfm.caching(CachingOptions.fromString(result.getString("caching"))); if (result.has("default_time_to_live")) cfm.defaultTimeToLive(result.getInt("default_time_to_live")); if (result.has("speculative_retry")) @@ -2312,7 +2235,6 @@ public final class CFMetaData .append("populateIoCacheOnFlush", populateIoCacheOnFlush) .append("droppedColumns", droppedColumns) .append("triggers", triggers) - .append("rowsPerPartitionToCache", rowsPerPartitionToCache) .toString(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/cql/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/AlterTableStatement.java b/src/java/org/apache/cassandra/cql/AlterTableStatement.java index 2fda212..034f660 100644 --- a/src/java/org/apache/cassandra/cql/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql/AlterTableStatement.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.cql; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.*; import org.apache.cassandra.db.marshal.TypeParser; import org.apache.cassandra.exceptions.*; @@ -179,8 +180,7 @@ public class AlterTableStatement throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead."); cfm.minCompactionThreshold(minCompactionThreshold); cfm.maxCompactionThreshold(maxCompactionThreshold); - cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString()))); - cfm.rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString(cfProps.getPropertyString(CFPropDefs.KW_ROWS_PER_PARTITION_TO_CACHE, cfm.getRowsPerPartitionToCache().toString()))); + cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString()))); cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive())); cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString()))); cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush())); http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java index e568dd7..b483451 100644 --- a/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java +++ b/src/java/org/apache/cassandra/cql/CreateColumnFamilyStatement.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.composites.SimpleDenseCellNameType; @@ -196,8 +197,7 @@ public class CreateColumnFamilyStatement .compactionStrategyClass(cfProps.compactionStrategyClass) .compactionStrategyOptions(cfProps.compactionStrategyOptions) .compressionParameters(CompressionParameters.create(cfProps.compressionParameters)) - .caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString()))) - .rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString(cfProps.getPropertyString(CFPropDefs.KW_ROWS_PER_PARTITION_TO_CACHE, CFMetaData.DEFAULT_ROWS_PER_PARTITION_TO_CACHE.toString()))) + .caching(CachingOptions.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString()))) .speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString()))) .bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null)) .memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java index f473e22..95fb750 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java +++ b/src/java/org/apache/cassandra/cql3/statements/CFPropDefs.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements; import java.util.*; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; @@ -149,6 +150,21 @@ public class CFPropDefs extends PropertyDefinitions return new HashMap<>(); return compressionOptions; } + public CachingOptions getCachingOptions() throws SyntaxException, ConfigurationException + { + CachingOptions options = null; + Object val = properties.get(KW_CACHING); + if (val == null) + return null; + else if (val instanceof Map) + options = CachingOptions.fromMap(getMap(KW_CACHING)); + else if (val instanceof String) // legacy syntax + { + options = CachingOptions.fromString(getSimple(KW_CACHING)); + logger.warn("Setting caching options with deprecated syntax."); + } + return options; + } public void applyToCFMetadata(CFMetaData cfm) throws ConfigurationException, SyntaxException { @@ -164,12 +180,6 @@ public class CFPropDefs extends PropertyDefinitions throw new ConfigurationException("Disabling compaction by setting compaction thresholds to 0 has been deprecated, set the compaction option 'enabled' to false instead."); cfm.minCompactionThreshold(minCompactionThreshold); cfm.maxCompactionThreshold(maxCompactionThreshold); - cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString()))); - CFMetaData.RowsPerPartitionToCache newRppc = CFMetaData.RowsPerPartitionToCache.fromString(getString(KW_ROWS_PER_PARTITION_TO_CACHE, cfm.getRowsPerPartitionToCache().toString())); - // we need to invalidate row cache if the amount of rows cached changes, otherwise we might serve out bad data. - if (!cfm.getRowsPerPartitionToCache().equals(newRppc)) - CacheService.instance.invalidateRowCacheForCf(cfm.cfId); - cfm.rowsPerPartitionToCache(newRppc); cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive())); cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString()))); cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod())); @@ -187,6 +197,9 @@ public class CFPropDefs extends PropertyDefinitions if (!getCompressionOptions().isEmpty()) cfm.compressionParameters(CompressionParameters.create(getCompressionOptions())); + CachingOptions cachingOptions = getCachingOptions(); + if (cachingOptions != null) + cfm.caching(cachingOptions); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 8d064dd..34aa5f5 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -77,8 +77,6 @@ import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.*; import org.apache.cassandra.utils.concurrent.OpOrder; -import static org.apache.cassandra.config.CFMetaData.Caching; - public class ColumnFamilyStore implements ColumnFamilyStoreMBean { private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); @@ -276,7 +274,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; - Caching caching = metadata.getCaching(); + CachingOptions caching = metadata.getCaching(); logger.info("Initializing {}.{}", keyspace.getName(), name); @@ -290,7 +288,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.addInitialSSTables(sstables); } - if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) + if (caching.keyCache.isEnabled()) CacheService.instance.keyCache.loadSaved(this); // compaction strategy should be created after the CFS has been prepared @@ -1498,7 +1496,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean try { // If we are explicitely asked to fill the cache with full partitions, we go ahead and query the whole thing - if (metadata.getRowsPerPartitionToCache().cacheFullPartitions()) + if (metadata.getCaching().rowCache.cacheFullPartitions()) { data = getTopLevelColumns(QueryFilter.getIdentityFilter(filter.key, name, filter.timestamp), Integer.MIN_VALUE); toCache = data; @@ -1521,7 +1519,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean if (filter.filter.isHeadFilter() && filter.filter.countCQL3Rows(metadata.comparator)) { SliceQueryFilter sliceFilter = (SliceQueryFilter)filter.filter; - int rowsToCache = metadata.getRowsPerPartitionToCache().rowsToCache; + int rowsToCache = metadata.getCaching().rowCache.rowsToCache; SliceQueryFilter cacheSlice = readFilterForCache(); QueryFilter cacheFilter = new QueryFilter(filter.key, name, cacheSlice, filter.timestamp); @@ -1578,7 +1576,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public SliceQueryFilter readFilterForCache() { // We create a new filter everytime before for now SliceQueryFilter is unfortunatly mutable. - return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getRowsPerPartitionToCache().rowsToCache, metadata.clusteringColumns().size()); + return new SliceQueryFilter(ColumnSlice.ALL_COLUMNS_ARRAY, false, metadata.getCaching().rowCache.rowsToCache, metadata.clusteringColumns().size()); } public boolean isFilterFullyCoveredBy(IDiskAtomFilter filter, ColumnFamily cachedCf, long now) @@ -1592,7 +1590,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // columns: if we use a timestamp newer than the one that was used when populating the cache, we might // end up deciding the whole partition is cached when it's really not (just some rows expired since the // cf was cached). This is the reason for Integer.MIN_VALUE below. - boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getRowsPerPartitionToCache().rowsToCache; + boolean wholePartitionCached = cachedCf.liveCQL3RowCount(Integer.MIN_VALUE) < metadata.getCaching().rowCache.rowsToCache; // Contrarily to the "wholePartitionCached" check above, we do want isFullyCoveredBy to take the // timestamp of the query into account when dealing with expired columns. Otherwise, we could think @@ -2674,9 +2672,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private boolean isRowCacheEnabled() { - return !(metadata.getCaching() == Caching.NONE - || metadata.getCaching() == Caching.KEYS_ONLY - || CacheService.instance.rowCache.getCapacity() == 0); + return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index 292d3da..0273341 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -33,6 +33,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; @@ -112,7 +113,7 @@ public class SystemKeyspace setupVersion(); migrateIndexInterval(); - + migrateCachingOption(); // add entries to system schema columnfamilies for the hardcoded system definitions for (String ksname : Schema.systemKeyspaceNames) { @@ -175,6 +176,36 @@ public class SystemKeyspace } } + private static void migrateCachingOption() + { + for (UntypedResultSet.Row row : processInternal(String.format("SELECT * FROM system.%s", SCHEMA_COLUMNFAMILIES_CF))) + { + if (!row.has("caching")) + continue; + + if (!CachingOptions.isLegacy(row.getString("caching"))) + continue; + try + { + CachingOptions caching = CachingOptions.fromString(row.getString("caching")); + CFMetaData table = CFMetaData.fromSchema(row); + logger.info("Migrating caching option {} to {} for {}.{}", row.getString("caching"), caching.toString(), table.ksName, table.cfName); + String query = String.format("SELECT writetime(type) " + + "FROM system.%s " + + "WHERE keyspace_name = '%s' AND columnfamily_name = '%s'", + SCHEMA_COLUMNFAMILIES_CF, + table.ksName, + table.cfName); + long timestamp = processInternal(query).one().getLong("writetime(type)"); + table.toSchema(timestamp).apply(); + } + catch (ConfigurationException e) + { + // shouldn't happen + } + } + } + /** * Write compaction log, except columfamilies under system keyspace. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java index c02e397..17e9b8f 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@ -36,6 +36,7 @@ import com.google.common.util.concurrent.RateLimiter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -1068,10 +1069,9 @@ public class SSTableReader extends SSTable implements Closeable public void cacheKey(DecoratedKey key, RowIndexEntry info) { - CFMetaData.Caching caching = metadata.getCaching(); + CachingOptions caching = metadata.getCaching(); - if (caching == CFMetaData.Caching.NONE - || caching == CFMetaData.Caching.ROWS_ONLY + if (!caching.keyCache.isEnabled() || keyCache == null || keyCache.getCapacity() == 0) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/da444a69/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index f67386a..5fb5697 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -21,6 +21,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.*; +import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.db.index.PerRowSecondaryIndexTest; import org.apache.cassandra.db.index.SecondaryIndex; import org.junit.AfterClass; @@ -164,7 +165,9 @@ public class SchemaLoader standardCFMD(ks1, "legacyleveled") .compactionStrategyClass(LeveledCompactionStrategy.class) .compactionStrategyOptions(leveledOptions), - standardCFMD(ks1, "StandardLowIndexInterval").minIndexInterval(8).maxIndexInterval(256).caching(CFMetaData.Caching.NONE))); + standardCFMD(ks1, "StandardLowIndexInterval").minIndexInterval(8) + .maxIndexInterval(256) + .caching(CachingOptions.NONE))); // Keyspace 2 @@ -228,9 +231,12 @@ public class SchemaLoader schema.add(KSMetaData.testMetadata(ks_rcs, simple, opts_rf1, - standardCFMD(ks_rcs, "CFWithoutCache").caching(CFMetaData.Caching.NONE), - standardCFMD(ks_rcs, "CachedCF").caching(CFMetaData.Caching.ALL).rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString("ALL")), - standardCFMD(ks_rcs, "CachedIntCF").defaultValidator(IntegerType.instance).caching(CFMetaData.Caching.ALL))); + standardCFMD(ks_rcs, "CFWithoutCache").caching(CachingOptions.NONE), + standardCFMD(ks_rcs, "CachedCF").caching(CachingOptions.ALL), + standardCFMD(ks_rcs, "CachedIntCF"). + defaultValidator(IntegerType.instance). + caching(new CachingOptions(new CachingOptions.KeyCache(CachingOptions.KeyCache.Type.ALL), + new CachingOptions.RowCache(CachingOptions.RowCache.Type.HEAD, 100))))); // CounterCacheSpace schema.add(KSMetaData.testMetadata(ks_ccs,