Merge branch 'cassandra-2.2' into cassandra-3.0
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9218d745 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9218d745 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9218d745 Branch: refs/heads/trunk Commit: 9218d7456b36d20ebe78bab23594e67d2f0c4a20 Parents: fde97c3 e63dacf Author: Robert Stupp <[email protected]> Authored: Wed Sep 16 22:00:32 2015 +0200 Committer: Robert Stupp <[email protected]> Committed: Wed Sep 16 22:00:32 2015 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/cache/AutoSavingCache.java | 195 +++++++++++-------- .../org/apache/cassandra/cache/CacheKey.java | 14 +- .../apache/cassandra/cache/CounterCacheKey.java | 26 +-- .../org/apache/cassandra/cache/KeyCacheKey.java | 19 +- .../org/apache/cassandra/cache/OHCProvider.java | 39 +++- .../org/apache/cassandra/cache/RowCacheKey.java | 34 ++-- .../org/apache/cassandra/config/CFMetaData.java | 10 + .../cassandra/config/DatabaseDescriptor.java | 19 +- .../org/apache/cassandra/config/Schema.java | 58 +++++- .../apache/cassandra/db/ColumnFamilyStore.java | 83 +++----- src/java/org/apache/cassandra/db/Keyspace.java | 6 - .../org/apache/cassandra/db/RowIndexEntry.java | 2 +- .../db/SinglePartitionReadCommand.java | 3 +- .../io/sstable/format/SSTableReader.java | 10 +- .../io/sstable/format/big/BigTableReader.java | 2 +- .../apache/cassandra/service/CacheService.java | 54 +++-- .../cassandra/service/CassandraDaemon.java | 39 +++- .../cassandra/service/StorageService.java | 29 +-- .../org/apache/cassandra/utils/FBUtilities.java | 16 ++ .../cassandra/cache/AutoSavingCacheTest.java | 5 +- .../cassandra/cache/CacheProviderTest.java | 24 ++- .../apache/cassandra/cql3/KeyCacheCqlTest.java | 101 ++++++++++ .../apache/cassandra/db/CounterCacheTest.java | 68 ++++++- .../org/apache/cassandra/db/KeyCacheTest.java | 2 +- .../org/apache/cassandra/db/RowCacheTest.java | 41 +++- 26 files changed, 612 insertions(+), 288 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index b213260,96ec0fa..8e3e947 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,18 -1,15 +1,19 @@@ -2.2.2 +3.0.0-rc1 + * Improve MV schema representation (CASSANDRA-9921) + * Add flag to enable/disable coordinator batchlog for MV writes (CASSANDRA-10230) + * Update cqlsh COPY for new internal driver serialization interface (CASSANDRA-10318) + * Give index implementations more control over rebuild operations (CASSANDRA-10312) + * Update index file format (CASSANDRA-10314) + * Add "shadowable" row tombstones to deal with mv timestamp issues (CASSANDRA-10261) + * CFS.loadNewSSTables() broken for pre-3.0 sstables + * Cache selected index in read command to reduce lookups (CASSANDRA-10215) + * Small optimizations of sstable index serialization (CASSANDRA-10232) + * Support for both encrypted and unencrypted native transport connections (CASSANDRA-9590) +Merged from 2.2: * Defer default role manager setup until all nodes are on 2.2+ (CASSANDRA-9761) - * Cancel transaction for sstables we wont redistribute index summary - for (CASSANDRA-10270) - * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) - * Retry snapshot deletion after compaction and gc on Windows (CASSANDRA-10222) - * Fix failure to start with space in directory path on Windows (CASSANDRA-10239) - * Fix repair hang when snapshot failed (CASSANDRA-10057) - * Fall back to 1/4 commitlog volume for commitlog_total_space on small disks - (CASSANDRA-10199) + * Handle missing RoleManager in config after upgrade to 2.2 (CASSANDRA-10209) Merged from 2.1: + * Fix cache handling of 2i and base tables (CASSANDRA-10155) * Fix NPE in nodetool compactionhistory (CASSANDRA-9758) * (Pig) support BulkOutputFormat as a URL parameter (CASSANDRA-7410) * BATCH statement is broken in cqlsh (CASSANDRA-10272) http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/AutoSavingCache.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/AutoSavingCache.java index ebd2830,3ec9d4e..4558bb7 --- a/src/java/org/apache/cassandra/cache/AutoSavingCache.java +++ b/src/java/org/apache/cassandra/cache/AutoSavingCache.java @@@ -35,7 -42,7 +42,6 @@@ import org.apache.cassandra.db.SystemKe import org.apache.cassandra.db.compaction.CompactionInfo; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.OperationType; --import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.*; import org.apache.cassandra.io.util.ChecksummedRandomAccessReader.CorruptFileException; @@@ -62,7 -68,15 +68,17 @@@ public class AutoSavingCache<K extends protected final CacheService.CacheType cacheType; private final CacheSerializer<K, V> cacheLoader; + + /* + * CASSANDRA-10155 required a format change to fix 2i indexes and caching. + * 2.2 is already at version "c" and 3.0 is at "d". + * + * Since cache versions match exactly and there is no partial fallback just add + * a minor version letter. ++ * ++ * Sticking with "d" is fine for 3.0 since it has never been released or used by another version + */ - private static final String CURRENT_VERSION = "ca"; + private static final String CURRENT_VERSION = "d"; private static volatile IStreamFactory streamFactory = new IStreamFactory() { @@@ -130,33 -142,90 +144,90 @@@ } } - public int loadSaved(ColumnFamilyStore cfs) + public ListenableFuture<Integer> loadSavedAsync() + { + final ListeningExecutorService es = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor()); + final long start = System.nanoTime(); + + ListenableFuture<Integer> cacheLoad = es.submit(new Callable<Integer>() + { + @Override + public Integer call() throws Exception + { + return loadSaved(); + } + }); + cacheLoad.addListener(new Runnable() { + @Override + public void run() + { + if (size() > 0) + logger.info("Completed loading ({} ms; {} keys) {} cache", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), + CacheService.instance.keyCache.size(), + cacheType); + es.shutdown(); + } - }, MoreExecutors.sameThreadExecutor()); ++ }, MoreExecutors.directExecutor()); + + return cacheLoad; + } + + public int loadSaved() { int count = 0; long start = System.nanoTime(); // modern format, allows both key and value (so key cache load can be purely sequential) - File dataPath = getCacheDataPath(cfs.metadata.cfId, CURRENT_VERSION); - File crcPath = getCacheCrcPath(cfs.metadata.cfId, CURRENT_VERSION); + File dataPath = getCacheDataPath(CURRENT_VERSION); + File crcPath = getCacheCrcPath(CURRENT_VERSION); if (dataPath.exists() && crcPath.exists()) { - DataInputStream in = null; + DataInputStreamPlus in = null; try { logger.info(String.format("reading saved cache %s", dataPath)); - in = new DataInputStream(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); + in = new DataInputStreamPlus(new LengthAvailableInputStream(new BufferedInputStream(streamFactory.getInputStream(dataPath, crcPath)), dataPath.length())); - List<Future<Pair<K, V>>> futures = new ArrayList<Future<Pair<K, V>>>(); + ArrayDeque<Future<Pair<K, V>>> futures = new ArrayDeque<Future<Pair<K, V>>>(); while (in.available() > 0) { - Future<Pair<K, V>> entry = cacheLoader.deserialize(in, cfs); + //ksname and cfname are serialized by the serializers in CacheService + //That is delegated there because there are serializer specific conditions + //where a cache key is skipped and not written + String ksname = in.readUTF(); + String cfname = in.readUTF(); + + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreIncludingIndexes(Pair.create(ksname, cfname)); + + Future<Pair<K, V>> entryFuture = cacheLoader.deserialize(in, cfs); // Key cache entry can return null, if the SSTable doesn't exist. - if (entry == null) + if (entryFuture == null) continue; - futures.add(entry); + + futures.offer(entryFuture); count++; + + /* + * Kind of unwise to accrue an unbounded number of pending futures + * So now there is this loop to keep a bounded number pending. + */ + do + { + while (futures.peek() != null && futures.peek().isDone()) + { + Future<Pair<K, V>> future = futures.poll(); + Pair<K, V> entry = future.get(); + if (entry != null && entry.right != null) + put(entry.left, entry.right); + } + + if (futures.size() > 1000) + Thread.yield(); + } while(futures.size() > 1000); } - for (Future<Pair<K, V>> future : futures) + Future<Pair<K, V>> future = null; + while ((future = futures.poll()) != null) { Pair<K, V> entry = future.get(); if (entry != null && entry.right != null) @@@ -377,8 -416,8 +418,8 @@@ public interface CacheSerializer<K extends CacheKey, V> { - void serialize(K key, DataOutputPlus out) throws IOException; + void serialize(K key, DataOutputPlus out, ColumnFamilyStore cfs) throws IOException; - Future<Pair<K, V>> deserialize(DataInputStream in, ColumnFamilyStore cfs) throws IOException; + Future<Pair<K, V>> deserialize(DataInputPlus in, ColumnFamilyStore cfs) throws IOException; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/CounterCacheKey.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/CounterCacheKey.java index 00766ee,68856eb..8b173bf --- a/src/java/org/apache/cassandra/cache/CounterCacheKey.java +++ b/src/java/org/apache/cassandra/cache/CounterCacheKey.java @@@ -19,53 -19,30 +19,45 @@@ package org.apache.cassandra.cache import java.nio.ByteBuffer; import java.util.Arrays; - import java.util.UUID; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNames; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.marshal.CompositeType; import org.apache.cassandra.utils.*; - public class CounterCacheKey implements CacheKey + public final class CounterCacheKey extends CacheKey { - private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBuffer.allocate(1))) - + ObjectSizes.measure(new UUID(0, 0)); - private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, CellNames.simpleDense(ByteBuffer.allocate(1)))); ++ private static final long EMPTY_SIZE = ObjectSizes.measure(new CounterCacheKey(null, ByteBufferUtil.EMPTY_BYTE_BUFFER, ByteBuffer.allocate(1))); - public final UUID cfId; public final byte[] partitionKey; public final byte[] cellName; - public CounterCacheKey(UUID cfId, ByteBuffer partitionKey, ByteBuffer cellName) - private CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName) ++ public CounterCacheKey(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, ByteBuffer cellName) { - this.cfId = cfId; + super(ksAndCFName); this.partitionKey = ByteBufferUtil.getArray(partitionKey); - this.cellName = ByteBufferUtil.getArray(cellName.toByteBuffer()); + this.cellName = ByteBufferUtil.getArray(cellName); } - public static CounterCacheKey create(UUID cfId, ByteBuffer partitionKey, Clustering clustering, ColumnDefinition c, CellPath path) - public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, CellName cellName) ++ public static CounterCacheKey create(Pair<String, String> ksAndCFName, ByteBuffer partitionKey, Clustering clustering, ColumnDefinition c, CellPath path) { - return new CounterCacheKey(cfId, partitionKey, makeCellName(clustering, c, path)); - return new CounterCacheKey(ksAndCFName, partitionKey, cellName); ++ return new CounterCacheKey(ksAndCFName, partitionKey, makeCellName(clustering, c, path)); + } + + private static ByteBuffer makeCellName(Clustering clustering, ColumnDefinition c, CellPath path) + { + int cs = clustering.size(); + ByteBuffer[] values = new ByteBuffer[cs + 1 + (path == null ? 0 : path.size())]; + for (int i = 0; i < cs; i++) + values[i] = clustering.get(i); + values[cs] = c.name.bytes; + if (path != null) + for (int i = 0; i < path.size(); i++) + values[cs + 1 + i] = path.get(i); + return CompositeType.build(values); } - public UUID getCFId() - { - return cfId; - } - public long unsharedHeapSize() { return EMPTY_SIZE http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/cache/OHCProvider.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/cache/OHCProvider.java index c6c6bb7,9b1c8cf..8a7bdfc --- a/src/java/org/apache/cassandra/cache/OHCProvider.java +++ b/src/java/org/apache/cassandra/cache/OHCProvider.java @@@ -17,18 -17,22 +17,18 @@@ */ package org.apache.cassandra.cache; -import java.io.DataInput; -import java.io.DataOutput; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.channels.WritableByteChannel; import java.util.Iterator; - import java.util.UUID; -import com.google.common.base.Function; - import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.io.util.DataOutputPlus; -import org.apache.cassandra.io.util.Memory; -import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.db.partitions.CachedPartition; +import org.apache.cassandra.io.util.DataInputBuffer; ++import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.io.util.DataOutputBufferFixed; - import org.apache.cassandra.io.util.NIODataInputStream; +import org.apache.cassandra.io.util.RebufferingInputStream; + import org.apache.cassandra.utils.Pair; import org.caffinitas.ohc.OHCache; import org.caffinitas.ohc.OHCacheBuilder; @@@ -122,27 -126,29 +122,50 @@@ public class OHCProvider implements Cac private static class KeySerializer implements org.caffinitas.ohc.CacheSerializer<RowCacheKey> { - public void serialize(RowCacheKey rowCacheKey, DataOutput dataOutput) throws IOException - { - dataOutput.writeUTF(rowCacheKey.ksAndCFName.left); - dataOutput.writeUTF(rowCacheKey.ksAndCFName.right); - dataOutput.writeInt(rowCacheKey.key.length); - dataOutput.write(rowCacheKey.key); - } - - public RowCacheKey deserialize(DataInput dataInput) throws IOException - { - String ksName = dataInput.readUTF(); - String cfName = dataInput.readUTF(); - byte[] key = new byte[dataInput.readInt()]; - dataInput.readFully(key); + private static KeySerializer instance = new KeySerializer(); + public void serialize(RowCacheKey rowCacheKey, ByteBuffer buf) + { - buf.putLong(rowCacheKey.cfId.getMostSignificantBits()); - buf.putLong(rowCacheKey.cfId.getLeastSignificantBits()); ++ @SuppressWarnings("resource") ++ DataOutputBuffer dataOutput = new DataOutputBufferFixed(buf); ++ try ++ { ++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.left); ++ dataOutput.writeUTF(rowCacheKey.ksAndCFName.right); ++ } ++ catch (IOException e) ++ { ++ throw new RuntimeException(e); ++ } + buf.putInt(rowCacheKey.key.length); + buf.put(rowCacheKey.key); + } + + public RowCacheKey deserialize(ByteBuffer buf) + { - long msb = buf.getLong(); - long lsb = buf.getLong(); ++ @SuppressWarnings("resource") ++ DataInputBuffer dataInput = new DataInputBuffer(buf, false); ++ String ksName = null; ++ String cfName = null; ++ try ++ { ++ ksName = dataInput.readUTF(); ++ cfName = dataInput.readUTF(); ++ } ++ catch (IOException e) ++ { ++ throw new RuntimeException(e); ++ } + byte[] key = new byte[buf.getInt()]; + buf.get(key); - return new RowCacheKey(new UUID(msb, lsb), key); + return new RowCacheKey(Pair.create(ksName, cfName), key); } public int serializedSize(RowCacheKey rowCacheKey) { - return 20 + rowCacheKey.key.length; - return TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.left) - + TypeSizes.NATIVE.sizeof(rowCacheKey.ksAndCFName.right) ++ return TypeSizes.sizeof(rowCacheKey.ksAndCFName.left) ++ + TypeSizes.sizeof(rowCacheKey.ksAndCFName.right) + + 4 + + rowCacheKey.key.length; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/CFMetaData.java index 929a34a,348eb89..69bf6bf --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@@ -43,15 -40,20 +43,18 @@@ import org.apache.cassandra.cql3.QueryP import org.apache.cassandra.cql3.statements.CFStatement; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.compaction.*; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.compaction.AbstractCompactionStrategy; import org.apache.cassandra.db.marshal.*; -import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.io.compress.CompressionParameters; -import org.apache.cassandra.io.compress.LZ4Compressor; -import org.apache.cassandra.io.sstable.format.Version; -import org.apache.cassandra.io.util.FileDataInput; -import org.apache.cassandra.schema.LegacySchemaTables; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.*; +import org.apache.cassandra.utils.*; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.Pair; + import org.apache.cassandra.utils.UUIDGen; import org.github.jamm.Unmetered; /** @@@ -73,29 -168,30 +76,31 @@@ public final class CFMetaDat public final UUID cfId; // internal id, never exposed to user public final String ksName; // name of keyspace public final String cfName; // name of this column family + public final Pair<String, String> ksAndCFName; + public final byte[] ksAndCFBytes; - public final ColumnFamilyType cfType; // standard, super - public volatile CellNameType comparator; // bytes, long, timeuuid, utf8, etc. - - //OPTIONAL - private volatile String comment = ""; - private volatile double readRepairChance = DEFAULT_READ_REPAIR_CHANCE; - private volatile double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; - private volatile int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; - private volatile AbstractType<?> defaultValidator = BytesType.instance; + + private final ImmutableSet<Flag> flags; + private final boolean isDense; + private final boolean isCompound; + private final boolean isSuper; + private final boolean isCounter; + private final boolean isView; + + private final boolean isIndex; + + public volatile ClusteringComparator comparator; // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns + public final IPartitioner partitioner; // partitioner the table uses + + private final Serializers serializers; + + // non-final, for now + public volatile TableParams params = TableParams.DEFAULT; + private volatile AbstractType<?> keyValidator = BytesType.instance; - private volatile int minCompactionThreshold = DEFAULT_MIN_COMPACTION_THRESHOLD; - private volatile int maxCompactionThreshold = DEFAULT_MAX_COMPACTION_THRESHOLD; - private volatile Double bloomFilterFpChance = null; - private volatile CachingOptions caching = DEFAULT_CACHING_STRATEGY; - private volatile int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL; - private volatile int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL; - private volatile int memtableFlushPeriod = 0; - private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE; - private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY; - private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>(); - private volatile Map<String, TriggerDefinition> triggers = new HashMap<>(); - private volatile boolean isPurged = false; + private volatile Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>(); + private volatile Triggers triggers = Triggers.none(); + private volatile Indexes indexes = Indexes.none(); + /* * All CQL3 columns definition are stored in the columnMetadata map. * On top of that, we keep separated collection of each kind of definition, to @@@ -103,278 -199,101 +108,283 @@@ * clustering key ones, those list are ordered by the "component index" of the * elements. */ - public static final String DEFAULT_KEY_ALIAS = "key"; - public static final String DEFAULT_COLUMN_ALIAS = "column"; - public static final String DEFAULT_VALUE_ALIAS = "value"; - - // We call dense a CF for which each component of the comparator is a clustering column, i.e. no - // component is used to store a regular column names. In other words, non-composite static "thrift" - // and CQL3 CF are *not* dense. - private volatile Boolean isDense; // null means "we don't know and need to infer from other data" - - private volatile Map<ByteBuffer, ColumnDefinition> columnMetadata = new HashMap<>(); + private final Map<ByteBuffer, ColumnDefinition> columnMetadata = new ConcurrentHashMap<>(); // not on any hot path private volatile List<ColumnDefinition> partitionKeyColumns; // Always of size keyValidator.componentsCount, null padded if necessary private volatile List<ColumnDefinition> clusteringColumns; // Of size comparator.componentsCount or comparator.componentsCount -1, null padded if necessary - private volatile SortedSet<ColumnDefinition> regularColumns; // We use a sorted set so iteration is of predictable order (for SELECT for instance) - private volatile SortedSet<ColumnDefinition> staticColumns; // Same as above - private volatile ColumnDefinition compactValueColumn; + private volatile PartitionColumns partitionColumns; // Always non-PK, non-clustering columns - public volatile Class<? extends AbstractCompactionStrategy> compactionStrategyClass = DEFAULT_COMPACTION_STRATEGY_CLASS; - public volatile Map<String, String> compactionStrategyOptions = new HashMap<>(); - - public volatile CompressionParameters compressionParameters = new CompressionParameters(null); - - // attribute setters that return the modified CFMetaData instance - public CFMetaData comment(String prop) {comment = Strings.nullToEmpty(prop); return this;} - public CFMetaData readRepairChance(double prop) {readRepairChance = prop; return this;} - public CFMetaData dcLocalReadRepairChance(double prop) {dcLocalReadRepairChance = prop; return this;} - public CFMetaData gcGraceSeconds(int prop) {gcGraceSeconds = prop; return this;} - public CFMetaData defaultValidator(AbstractType<?> prop) {defaultValidator = prop; return this;} - public CFMetaData keyValidator(AbstractType<?> prop) {keyValidator = prop; return this;} - public CFMetaData minCompactionThreshold(int prop) {minCompactionThreshold = prop; return this;} - public CFMetaData maxCompactionThreshold(int prop) {maxCompactionThreshold = prop; return this;} - public CFMetaData compactionStrategyClass(Class<? extends AbstractCompactionStrategy> prop) {compactionStrategyClass = prop; return this;} - public CFMetaData compactionStrategyOptions(Map<String, String> prop) {compactionStrategyOptions = prop; return this;} - public CFMetaData compressionParameters(CompressionParameters prop) {compressionParameters = prop; return this;} - public CFMetaData bloomFilterFpChance(double prop) {bloomFilterFpChance = prop; return this;} - public CFMetaData caching(CachingOptions prop) {caching = prop; return this;} - public CFMetaData minIndexInterval(int prop) {minIndexInterval = prop; return this;} - public CFMetaData maxIndexInterval(int prop) {maxIndexInterval = prop; return this;} - public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;} - public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;} - public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;} - public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;} - public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;} - public CFMetaData isDense(Boolean prop) {isDense = prop; return this;} + // For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep + // that as convenience to access that column more easily (but we could replace calls by partitionColumns().iterator().next() + // for those tables in practice). + private volatile ColumnDefinition compactValueColumn; - /** - * Create new ColumnFamily metadata with generated random ID. - * When loading from existing schema, use CFMetaData - * - * @param keyspace keyspace name - * @param name column family name - * @param comp default comparator + /* + * All of these methods will go away once CFMetaData becomes completely immutable. */ - public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp) + public CFMetaData params(TableParams params) + { + this.params = params; + return this; + } + + public CFMetaData bloomFilterFpChance(double prop) + { + params = TableParams.builder(params).bloomFilterFpChance(prop).build(); + return this; + } + + public CFMetaData caching(CachingParams prop) + { + params = TableParams.builder(params).caching(prop).build(); + return this; + } + + public CFMetaData comment(String prop) + { + params = TableParams.builder(params).comment(prop).build(); + return this; + } + + public CFMetaData compaction(CompactionParams prop) + { + params = TableParams.builder(params).compaction(prop).build(); + return this; + } + + public CFMetaData compression(CompressionParams prop) + { + params = TableParams.builder(params).compression(prop).build(); + return this; + } + + public CFMetaData dcLocalReadRepairChance(double prop) + { + params = TableParams.builder(params).dcLocalReadRepairChance(prop).build(); + return this; + } + + public CFMetaData defaultTimeToLive(int prop) + { + params = TableParams.builder(params).defaultTimeToLive(prop).build(); + return this; + } + + public CFMetaData gcGraceSeconds(int prop) + { + params = TableParams.builder(params).gcGraceSeconds(prop).build(); + return this; + } + + public CFMetaData maxIndexInterval(int prop) + { + params = TableParams.builder(params).maxIndexInterval(prop).build(); + return this; + } + + public CFMetaData memtableFlushPeriod(int prop) + { + params = TableParams.builder(params).memtableFlushPeriodInMs(prop).build(); + return this; + } + + public CFMetaData minIndexInterval(int prop) + { + params = TableParams.builder(params).minIndexInterval(prop).build(); + return this; + } + + public CFMetaData readRepairChance(double prop) + { + params = TableParams.builder(params).readRepairChance(prop).build(); + return this; + } + + public CFMetaData speculativeRetry(SpeculativeRetryParam prop) + { + params = TableParams.builder(params).speculativeRetry(prop).build(); + return this; + } + + public CFMetaData extensions(Map<String, ByteBuffer> extensions) + { + params = TableParams.builder(params).extensions(extensions).build(); + return this; + } + + public CFMetaData droppedColumns(Map<ByteBuffer, DroppedColumn> cols) + { + droppedColumns = cols; + return this; + } + + public CFMetaData triggers(Triggers prop) { - this(keyspace, name, type, comp, UUIDGen.getTimeUUID()); + triggers = prop; + return this; } - public CFMetaData(String keyspace, String name, ColumnFamilyType type, CellNameType comp, UUID id) + public CFMetaData indexes(Indexes indexes) { - cfId = id; - ksName = keyspace; - cfName = name; + this.indexes = indexes; + return this; + } + + private CFMetaData(String keyspace, + String name, + UUID cfId, + boolean isSuper, + boolean isCounter, + boolean isDense, + boolean isCompound, + boolean isView, + List<ColumnDefinition> partitionKeyColumns, + List<ColumnDefinition> clusteringColumns, + PartitionColumns partitionColumns, + IPartitioner partitioner) + { + this.cfId = cfId; + this.ksName = keyspace; + this.cfName = name; + ksAndCFName = Pair.create(keyspace, name); + byte[] ksBytes = FBUtilities.toWriteUTFBytes(ksName); + byte[] cfBytes = FBUtilities.toWriteUTFBytes(cfName); + ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length); + System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length); - cfType = type; - comparator = comp; + this.isDense = isDense; + this.isCompound = isCompound; + this.isSuper = isSuper; + this.isCounter = isCounter; + this.isView = isView; + + EnumSet<Flag> flags = EnumSet.noneOf(Flag.class); + if (isSuper) + flags.add(Flag.SUPER); + if (isCounter) + flags.add(Flag.COUNTER); + if (isDense) + flags.add(Flag.DENSE); + if (isCompound) + flags.add(Flag.COMPOUND); + this.flags = Sets.immutableEnumSet(flags); + + isIndex = cfName.contains("."); + + assert partitioner != null; + this.partitioner = partitioner; + + // A compact table should always have a clustering + assert isCQLTable() || !clusteringColumns.isEmpty() : String.format("For table %s.%s, isDense=%b, isCompound=%b, clustering=%s", ksName, cfName, isDense, isCompound, clusteringColumns); + + this.partitionKeyColumns = partitionKeyColumns; + this.clusteringColumns = clusteringColumns; + this.partitionColumns = partitionColumns; + + this.serializers = new Serializers(this); + rebuild(); + } + + // This rebuild informations that are intrinsically duplicate of the table definition but + // are kept because they are often useful in a different format. + private void rebuild() + { + this.comparator = new ClusteringComparator(extractTypes(clusteringColumns)); + + this.columnMetadata.clear(); + for (ColumnDefinition def : partitionKeyColumns) + this.columnMetadata.put(def.name.bytes, def); + for (ColumnDefinition def : clusteringColumns) + { + this.columnMetadata.put(def.name.bytes, def); + def.type.checkComparable(); + } + for (ColumnDefinition def : partitionColumns) + this.columnMetadata.put(def.name.bytes, def); + + List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns); + this.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes); + + if (isCompactTable()) + this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper()); + } + + public Indexes getIndexes() + { + return indexes; } - public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp, AbstractType<?> subcc) + public static CFMetaData create(String ksName, + String name, + UUID cfId, + boolean isDense, + boolean isCompound, + boolean isSuper, + boolean isCounter, + boolean isView, + List<ColumnDefinition> columns, + IPartitioner partitioner) { - CellNameType cellNameType = CellNames.fromAbstractType(makeRawAbstractType(comp, subcc), true); - return new CFMetaData(keyspace, name, subcc == null ? ColumnFamilyType.Standard : ColumnFamilyType.Super, cellNameType); + List<ColumnDefinition> partitions = new ArrayList<>(); + List<ColumnDefinition> clusterings = new ArrayList<>(); + PartitionColumns.Builder builder = PartitionColumns.builder(); + + for (ColumnDefinition column : columns) + { + switch (column.kind) + { + case PARTITION_KEY: + partitions.add(column); + break; + case CLUSTERING: + clusterings.add(column); + break; + default: + builder.add(column); + break; + } + } + + Collections.sort(partitions); + Collections.sort(clusterings); + + return new CFMetaData(ksName, + name, + cfId, + isSuper, + isCounter, + isDense, + isCompound, + isView, + partitions, + clusterings, + builder.build(), + partitioner); } - public static CFMetaData sparseCFMetaData(String keyspace, String name, AbstractType<?> comp) + private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns) { - CellNameType cellNameType = CellNames.fromAbstractType(comp, false); - return new CFMetaData(keyspace, name, ColumnFamilyType.Standard, cellNameType); + List<AbstractType<?>> types = new ArrayList<>(clusteringColumns.size()); + for (ColumnDefinition def : clusteringColumns) + types.add(def.type); + return types; } - public static CFMetaData denseCFMetaData(String keyspace, String name, AbstractType<?> comp) + public Set<Flag> flags() { - return denseCFMetaData(keyspace, name, comp, null); + return flags; } - public static AbstractType<?> makeRawAbstractType(AbstractType<?> comparator, AbstractType<?> subComparator) + /** + * There is a couple of places in the code where we need a CFMetaData object and don't have one readily available + * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what + * you're doing. + */ + public static CFMetaData createFake(String keyspace, String name) { - return subComparator == null ? comparator : CompositeType.getInstance(Arrays.asList(comparator, subComparator)); + return CFMetaData.Builder.create(keyspace, name).addPartitionKey("key", BytesType.instance).build(); } - public Map<String, TriggerDefinition> getTriggers() + public Triggers getTriggers() { return triggers; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/DatabaseDescriptor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/DatabaseDescriptor.java index 7553c92,c459b5d..7c062a1 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@@ -1538,25 -1480,11 +1538,16 @@@ public class DatabaseDescripto return conf.max_hint_window_in_ms; } + public static File getHintsDirectory() + { + return new File(conf.hints_directory); + } + - public static File getSerializedCachePath(String ksName, - String cfName, - UUID cfId, - CacheService.CacheType cacheType, - String version, - String extension) - { - StringBuilder builder = new StringBuilder(); - builder.append(ksName).append('-'); - builder.append(cfName).append('-'); - builder.append(ByteBufferUtil.bytesToHex(ByteBufferUtil.bytes(cfId))).append('-'); - builder.append(cacheType); - builder.append((version == null ? "" : "-" + version + "." + extension)); - return new File(conf.saved_caches_directory, builder.toString()); + public static File getSerializedCachePath(CacheService.CacheType cacheType, String version, String extension) + { + String name = cacheType.toString() + + (version == null ? "" : "-" + version + "." + extension); + return new File(conf.saved_caches_directory, name); } public static int getDynamicUpdateInterval() http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/config/Schema.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/config/Schema.java index bcde978,00c9358..bcfac1b --- a/src/java/org/apache/cassandra/config/Schema.java +++ b/src/java/org/apache/cassandra/config/Schema.java @@@ -27,16 -26,17 +27,17 @@@ import com.google.common.collect.Sets import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.functions.Functions; -import org.apache.cassandra.cql3.functions.UDAggregate; -import org.apache.cassandra.cql3.functions.UDFunction; -import org.apache.cassandra.db.*; +import org.apache.cassandra.cql3.functions.*; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.SystemKeyspace; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UserType; -import org.apache.cassandra.db.index.SecondaryIndex; ++import org.apache.cassandra.index.Index; import org.apache.cassandra.io.sstable.Descriptor; -import org.apache.cassandra.schema.LegacySchemaTables; +import org.apache.cassandra.schema.*; import org.apache.cassandra.service.MigrationManager; import org.apache.cassandra.utils.ConcurrentBiMap; import org.apache.cassandra.utils.Pair; @@@ -164,6 -156,53 +165,55 @@@ public class Schem return keyspaceInstances.get(keyspaceName); } + /** + * Retrieve a CFS by name even if that CFS is an index + * + * An index is identified by looking for '.' in the CF name and separating to find the base table + * containing the index + * @param ksNameAndCFName + * @return The named CFS or null if the keyspace, base table, or index don't exist + */ + public ColumnFamilyStore getColumnFamilyStoreIncludingIndexes(Pair<String, String> ksNameAndCFName) { + String ksName = ksNameAndCFName.left; + String cfName = ksNameAndCFName.right; + Pair<String, String> baseTable; + + /* + * Split does special case a one character regex, and it looks like it can detect + * if you use two characters to escape '.', but it still allocates a useless array. + */ + int indexOfSeparator = cfName.indexOf('.'); + if (indexOfSeparator > -1) + baseTable = Pair.create(ksName, cfName.substring(0, indexOfSeparator)); + else + baseTable = ksNameAndCFName; + + UUID cfId = cfIdMap.get(baseTable); + if (cfId == null) + return null; + + Keyspace ks = keyspaceInstances.get(ksName); + if (ks == null) + return null; + + ColumnFamilyStore baseCFS = ks.getColumnFamilyStore(cfId); + + //Not an index + if (indexOfSeparator == -1) + return baseCFS; + + if (baseCFS == null) + return null; + - SecondaryIndex index = baseCFS.indexManager.getIndexByName(cfName); ++ Index index = baseCFS.indexManager.getIndexByName(cfName.substring(indexOfSeparator + 1, cfName.length())); + if (index == null) + return null; + - return index.getIndexCfs(); ++ //Shouldn't ask for a backing table if there is none so just throw? ++ //Or should it return null? ++ return index.getBackingTable().get(); + } + public ColumnFamilyStore getColumnFamilyStoreInstance(UUID cfId) { Pair<String, String> pair = cfIdMap.inverse().get(cfId); http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0d6d801,a8a8910..cdb9770 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -395,12 -391,8 +393,9 @@@ public class ColumnFamilyStore implemen data.addInitialSSTables(sstables); } - if (caching.cacheKeys()) - CacheService.instance.keyCache.loadSaved(this); - // compaction strategy should be created after the CFS has been prepared - this.compactionStrategyWrapper = new WrappingCompactionStrategy(this); + this.compactionStrategyManager = new CompactionStrategyManager(this); + this.directories = this.compactionStrategyManager.getDirectories(); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) { @@@ -620,47 -610,104 +615,14 @@@ } // also clean out any index leftovers. - for (ColumnDefinition def : metadata.allColumns()) - { - if (def.isIndexed()) - { - CellNameType indexComparator = SecondaryIndex.getIndexComparator(metadata, def); - if (indexComparator != null) - { - CFMetaData indexMetadata = CFMetaData.newIndexMetadata(metadata, def, indexComparator); - scrubDataDirectories(indexMetadata); - } - } - } - } - - /** - * Replacing compacted sstables is atomic as far as observers of Tracker are concerned, but not on the - * filesystem: first the new sstables are renamed to "live" status (i.e., the tmp marker is removed), then - * their ancestors are removed. - * - * If an unclean shutdown happens at the right time, we can thus end up with both the new ones and their - * ancestors "live" in the system. This is harmless for normal data, but for counters it can cause overcounts. - * - * To prevent this, we record sstables being compacted in the system keyspace. If we find unfinished - * compactions, we remove the new ones (since those may be incomplete -- under LCS, we may create multiple - * sstables from any given ancestor). - */ - public static void removeUnfinishedCompactionLeftovers(CFMetaData metadata, Map<Integer, UUID> unfinishedCompactions) - { - Directories directories = new Directories(metadata); - Set<Integer> allGenerations = new HashSet<>(); - for (Descriptor desc : directories.sstableLister().list().keySet()) - allGenerations.add(desc.generation); - - // sanity-check unfinishedCompactions - Set<Integer> unfinishedGenerations = unfinishedCompactions.keySet(); - if (!allGenerations.containsAll(unfinishedGenerations)) - { - HashSet<Integer> missingGenerations = new HashSet<>(unfinishedGenerations); - missingGenerations.removeAll(allGenerations); - logger.debug("Unfinished compactions of {}.{} reference missing sstables of generations {}", - metadata.ksName, metadata.cfName, missingGenerations); - } - - // remove new sstables from compactions that didn't complete, and compute - // set of ancestors that shouldn't exist anymore - Set<Integer> completedAncestors = new HashSet<>(); - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().skipTemporary(true).list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - - Set<Integer> ancestors; - try - { - CompactionMetadata compactionMetadata = (CompactionMetadata) desc.getMetadataSerializer().deserialize(desc, MetadataType.COMPACTION); - ancestors = compactionMetadata.ancestors; - } - catch (IOException e) - { - throw new FSReadError(e, desc.filenameFor(Component.STATS)); - } - catch (NullPointerException e) - { - throw new FSReadError(e, "Failed to remove unfinished compaction leftovers (file: " + desc.filenameFor(Component.STATS) + "). See log for details."); - } - - if (!ancestors.isEmpty() - && unfinishedGenerations.containsAll(ancestors) - && allGenerations.containsAll(ancestors)) - { - // any of the ancestors would work, so we'll just lookup the compaction task ID with the first one - UUID compactionTaskID = unfinishedCompactions.get(ancestors.iterator().next()); - assert compactionTaskID != null; - logger.debug("Going to delete unfinished compaction product {}", desc); - SSTable.delete(desc, sstableFiles.getValue()); - SystemKeyspace.finishCompaction(compactionTaskID); - } - else - { - completedAncestors.addAll(ancestors); - } - } - - // remove old sstables from compactions that did complete - for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet()) - { - Descriptor desc = sstableFiles.getKey(); - if (completedAncestors.contains(desc.generation)) + for (IndexMetadata index : metadata.getIndexes()) + if (!index.isCustom()) { - // if any of the ancestors were participating in a compaction, finish that compaction - logger.debug("Going to delete leftover compaction ancestor {}", desc); - SSTable.delete(desc, sstableFiles.getValue()); - UUID compactionTaskID = unfinishedCompactions.get(desc.generation); - if (compactionTaskID != null) - SystemKeyspace.finishCompaction(unfinishedCompactions.get(desc.generation)); + CFMetaData indexMetadata = CassandraIndex.indexCfsMetadata(metadata, index); + scrubDataDirectories(indexMetadata); } - } } - // must be called after all sstables are loaded since row cache merges all row versions - public void init() - { - if (!isRowCacheEnabled()) - return; - - long start = System.nanoTime(); - - int cachedRowsRead = CacheService.instance.rowCache.loadSaved(this); - if (cachedRowsRead > 0) - logger.info("Completed loading ({} ms; {} keys) row cache for {}.{}", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), - cachedRowsRead, - keyspace.getName(), - name); - } - - public void initCounterCache() - { - if (!metadata.isCounter() || CacheService.instance.counterCache.getCapacity() == 0) - return; - - long start = System.nanoTime(); - - int cachedShardsRead = CacheService.instance.counterCache.loadSaved(this); - if (cachedShardsRead > 0) - logger.info("Completed loading ({} ms; {} shards) counter cache for {}.{}", - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), - cachedShardsRead, - keyspace.getName(), - name); - } - /** * See #{@code StorageService.loadNewSSTables(String, String)} for more info * @@@ -1167,8 -1226,8 +1129,8 @@@ if (!isRowCacheEnabled()) return; - RowCacheKey cacheKey = new RowCacheKey(metadata.cfId, key); + RowCacheKey cacheKey = new RowCacheKey(metadata.ksAndCFName, key); - invalidateCachedRow(cacheKey); + invalidateCachedPartition(cacheKey); } /** @@@ -1555,9 -2040,9 +1517,9 @@@ keyIter.hasNext(); ) { RowCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.key)); + DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.key)); - if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) - invalidateCachedRow(dk); + invalidateCachedPartition(dk); } if (metadata.isCounter()) @@@ -1566,8 -2051,8 +1528,8 @@@ keyIter.hasNext(); ) { CounterCacheKey key = keyIter.next(); - DecoratedKey dk = partitioner.decorateKey(ByteBuffer.wrap(key.partitionKey)); + DecoratedKey dk = decorateKey(ByteBuffer.wrap(key.partitionKey)); - if (key.cfId.equals(metadata.cfId) && !Range.isInRanges(dk.getToken(), ranges)) + if (key.ksAndCFName.equals(metadata.ksAndCFName) && !Range.isInRanges(dk.getToken(), ranges)) CacheService.instance.counterCache.remove(key); } } @@@ -1768,9 -2501,9 +1730,8 @@@ { if (!isRowCacheEnabled()) return null; -- - IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.cfId, key)); + IRowCacheEntry cached = CacheService.instance.rowCache.getInternal(new RowCacheKey(metadata.ksAndCFName, key)); - return cached == null || cached instanceof RowCacheSentinel ? null : (ColumnFamily)cached; + return cached == null || cached instanceof RowCacheSentinel ? null : (CachedPartition)cached; } private void invalidateCaches() @@@ -1784,37 -2517,37 +1745,36 @@@ /** * @return true if @param key is contained in the row cache */ - public boolean containsCachedRow(DecoratedKey key) + public boolean containsCachedParition(DecoratedKey key) { - return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.cfId, key)); + return CacheService.instance.rowCache.getCapacity() != 0 && CacheService.instance.rowCache.containsKey(new RowCacheKey(metadata.ksAndCFName, key)); } - public void invalidateCachedRow(RowCacheKey key) + public void invalidateCachedPartition(RowCacheKey key) { CacheService.instance.rowCache.remove(key); } - public void invalidateCachedRow(DecoratedKey key) + public void invalidateCachedPartition(DecoratedKey key) { -- UUID cfId = Schema.instance.getId(keyspace.getName(), this.name); -- if (cfId == null) -- return; // secondary index ++ if (!Schema.instance.hasCF(metadata.ksAndCFName)) ++ return; //2i don't cache rows - invalidateCachedPartition(new RowCacheKey(cfId, key)); - invalidateCachedRow(new RowCacheKey(metadata.ksAndCFName, key)); ++ invalidateCachedPartition(new RowCacheKey(metadata.ksAndCFName, key)); } - public ClockAndCount getCachedCounter(ByteBuffer partitionKey, CellName cellName) + public ClockAndCount getCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return null; - return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path)); - return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName)); ++ return CacheService.instance.counterCache.get(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path)); } - public void putCachedCounter(ByteBuffer partitionKey, CellName cellName, ClockAndCount clockAndCount) + public void putCachedCounter(ByteBuffer partitionKey, Clustering clustering, ColumnDefinition column, CellPath path, ClockAndCount clockAndCount) { if (CacheService.instance.counterCache.getCapacity() == 0L) // counter cache disabled. return; - CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.cfId, partitionKey, clustering, column, path), clockAndCount); - CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, cellName), clockAndCount); ++ CacheService.instance.counterCache.put(CounterCacheKey.create(metadata.ksAndCFName, partitionKey, clustering, column, path), clockAndCount); } public void forceMajorCompaction() throws InterruptedException, ExecutionException @@@ -2237,7 -2934,17 +2197,20 @@@ public boolean isRowCacheEnabled() { - return metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0; - return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; ++ ++ boolean retval = metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0; ++ assert(!retval || !isIndex()); ++ return retval; + } + + public boolean isCounterCacheEnabled() + { + return metadata.isCounter() && CacheService.instance.counterCache.getCapacity() > 0; + } + + public boolean isKeyCacheEnabled() + { - return metadata.getCaching().keyCache.isEnabled() && CacheService.instance.keyCache.getCapacity() > 0; ++ return metadata.params.caching.cacheKeys() && CacheService.instance.keyCache.getCapacity() > 0; } /** http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/RowIndexEntry.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/RowIndexEntry.java index 43dc80c,f9d8c6d..198e890 --- a/src/java/org/apache/cassandra/db/RowIndexEntry.java +++ b/src/java/org/apache/cassandra/db/RowIndexEntry.java @@@ -227,23 -149,15 +227,23 @@@ public class RowIndexEntry<T> implement } } - public static void skip(DataInput in) throws IOException + // Reads only the data 'position' of the index entry and returns it. Note that this left 'in' in the middle + // of reading an entry, so this is only useful if you know what you are doing and in most case 'deserialize' + // should be used instead. + public static long readPosition(DataInputPlus in, Version version) throws IOException { - in.readLong(); - skipPromotedIndex(in); + return version.storeRows() ? in.readUnsignedVInt() : in.readLong(); } - private static void skipPromotedIndex(DataInput in) throws IOException + public static void skip(DataInputPlus in, Version version) throws IOException { - int size = in.readInt(); + readPosition(in, version); + skipPromotedIndex(in, version); + } + - public static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException ++ private static void skipPromotedIndex(DataInputPlus in, Version version) throws IOException + { + int size = version.storeRows() ? (int)in.readUnsignedVInt() : in.readInt(); if (size <= 0) return; http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index c08ef6a,0000000..cd01748 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@@ -1,527 -1,0 +1,526 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; + +import org.apache.cassandra.cache.IRowCacheEntry; +import org.apache.cassandra.cache.RowCacheKey; +import org.apache.cassandra.cache.RowCacheSentinel; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.service.CacheService; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.pager.*; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.concurrent.OpOrder; + +/** + * A read command that selects a (part of a) single partition. + */ +public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter> extends ReadCommand +{ + protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); + + private final DecoratedKey partitionKey; + private final F clusteringIndexFilter; + + protected SinglePartitionReadCommand(boolean isDigest, + int digestVersion, + boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + F clusteringIndexFilter) + { + super(Kind.SINGLE_PARTITION, isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits); + assert partitionKey.getPartitioner() == metadata.partitioner; + this.partitionKey = partitionKey; + this.clusteringIndexFilter = clusteringIndexFilter; + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand<?> create(CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + return create(false, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition for thrift. + * + * @param isForThrift whether the query is for thrift or not. + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param columnFilter the column filter to use for the query. + * @param rowFilter the row filter to use for the query. + * @param limits the limits to use for the query. + * @param partitionKey the partition key for the partition to query. + * @param clusteringIndexFilter the clustering index filter to use for the query. + * + * @return a newly created read command. + */ + public static SinglePartitionReadCommand<?> create(boolean isForThrift, + CFMetaData metadata, + int nowInSec, + ColumnFilter columnFilter, + RowFilter rowFilter, + DataLimits limits, + DecoratedKey partitionKey, + ClusteringIndexFilter clusteringIndexFilter) + { + if (clusteringIndexFilter instanceof ClusteringIndexSliceFilter) + return new SinglePartitionSliceCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexSliceFilter) clusteringIndexFilter); + + assert clusteringIndexFilter instanceof ClusteringIndexNamesFilter; + return new SinglePartitionNamesCommand(false, 0, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, partitionKey, (ClusteringIndexNamesFilter) clusteringIndexFilter); + } + + /** + * Creates a new read command on a single partition. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * @param columnFilter the column filter to use for the query. + * @param filter the clustering index filter to use for the query. + * + * @return a newly created read command. The returned command will use no row filter and have no limits. + */ + public static SinglePartitionReadCommand<?> create(CFMetaData metadata, int nowInSec, DecoratedKey key, ColumnFilter columnFilter, ClusteringIndexFilter filter) + { + return create(metadata, nowInSec, columnFilter, RowFilter.NONE, DataLimits.NONE, key, filter); + } + + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, DecoratedKey key) + { + return SinglePartitionSliceCommand.create(metadata, nowInSec, key, Slices.ALL); + } + + /** + * Creates a new read command that queries a single partition in its entirety. + * + * @param metadata the table to query. + * @param nowInSec the time in seconds to use are "now" for this query. + * @param key the partition key for the partition to query. + * + * @return a newly created read command that queries all the rows of {@code key}. + */ + public static SinglePartitionReadCommand fullPartitionRead(CFMetaData metadata, int nowInSec, ByteBuffer key) + { + return SinglePartitionSliceCommand.create(metadata, nowInSec, metadata.decorateKey(key), Slices.ALL); + } + + public DecoratedKey partitionKey() + { + return partitionKey; + } + + public F clusteringIndexFilter() + { + return clusteringIndexFilter; + } + + public ClusteringIndexFilter clusteringIndexFilter(DecoratedKey key) + { + return clusteringIndexFilter; + } + + public long getTimeout() + { + return DatabaseDescriptor.getReadRpcTimeout(); + } + + public boolean selects(DecoratedKey partitionKey, Clustering clustering) + { + if (!partitionKey().equals(partitionKey)) + return false; + + if (clustering == Clustering.STATIC_CLUSTERING) + return !columnFilter().fetchedColumns().statics.isEmpty(); + + return clusteringIndexFilter().selects(clustering); + } + + /** + * Returns a new command suitable to paging from the last returned row. + * + * @param lastReturned the last row returned by the previous page. The newly created command + * will only query row that comes after this (in query order). This can be {@code null} if this + * is the first page. + * @param pageSize the size to use for the page to query. + * + * @return the newly create command. + */ + public SinglePartitionReadCommand forPaging(Clustering lastReturned, int pageSize) + { + // We shouldn't have set digest yet when reaching that point + assert !isDigestQuery(); + return create(isForThrift(), + metadata(), + nowInSec(), + columnFilter(), + rowFilter(), + limits().forPaging(pageSize), + partitionKey(), + lastReturned == null ? clusteringIndexFilter() : clusteringIndexFilter.forPaging(metadata().comparator, lastReturned, false)); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(Group.one(this), consistency, clientState); + } + + public SinglePartitionPager getPager(PagingState pagingState) + { + return getPager(this, pagingState); + } + + private static SinglePartitionPager getPager(SinglePartitionReadCommand command, PagingState pagingState) + { + return new SinglePartitionPager(command, pagingState); + } + + protected void recordLatency(TableMetrics metric, long latencyNanos) + { + metric.readLatency.addNano(latencyNanos); + } + + protected UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, ReadOrderGroup orderGroup) + { + @SuppressWarnings("resource") // we close the created iterator through closing the result of this method (and SingletonUnfilteredPartitionIterator ctor cannot fail) + UnfilteredRowIterator partition = cfs.isRowCacheEnabled() + ? getThroughCache(cfs, orderGroup.baseReadOpOrderGroup()) + : queryMemtableAndDisk(cfs, orderGroup.baseReadOpOrderGroup()); + return new SingletonUnfilteredPartitionIterator(partition, isForThrift()); + } + + /** + * Fetch the rows requested if in cache; if not, read it from disk and cache it. + * <p> + * If the partition is cached, and the filter given is within its bounds, we return + * from cache, otherwise from disk. + * <p> + * If the partition is is not cached, we figure out what filter is "biggest", read + * that from disk, then filter the result and either cache that or return it. + */ + private UnfilteredRowIterator getThroughCache(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + assert !cfs.isIndex(); // CASSANDRA-5732 + assert cfs.isRowCacheEnabled() : String.format("Row cache is not enabled on table [%s]", cfs.name); + - UUID cfId = metadata().cfId; - RowCacheKey key = new RowCacheKey(cfId, partitionKey()); ++ RowCacheKey key = new RowCacheKey(metadata().ksAndCFName, partitionKey()); + + // Attempt a sentinel-read-cache sequence. if a write invalidates our sentinel, we'll return our + // (now potentially obsolete) data, but won't cache it. see CASSANDRA-3862 + // TODO: don't evict entire partitions on writes (#2864) + IRowCacheEntry cached = CacheService.instance.rowCache.get(key); + if (cached != null) + { + if (cached instanceof RowCacheSentinel) + { + // Some other read is trying to cache the value, just do a normal non-caching read + Tracing.trace("Row cache miss (race)"); + cfs.metric.rowCacheMiss.inc(); + return queryMemtableAndDisk(cfs, readOp); + } + + CachedPartition cachedPartition = (CachedPartition)cached; + if (cfs.isFilterFullyCoveredBy(clusteringIndexFilter(), limits(), cachedPartition, nowInSec())) + { + cfs.metric.rowCacheHit.inc(); + Tracing.trace("Row cache hit"); + return clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), cachedPartition); + } + + cfs.metric.rowCacheHitOutOfRange.inc(); + Tracing.trace("Ignoring row cache as cached value could not satisfy query"); + return queryMemtableAndDisk(cfs, readOp); + } + + cfs.metric.rowCacheMiss.inc(); + Tracing.trace("Row cache miss"); + + boolean cacheFullPartitions = metadata().params.caching.cacheAllRows(); + + // To be able to cache what we read, what we read must at least covers what the cache holds, that + // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows + // systematically, but we'd have to "extend" that to whatever is needed for the user query that the + // 'rowsToCache' first rows don't cover and it's not trivial with our existing filters. So currently + // we settle for caching what we read only if the user query does query the head of the partition since + // that's the common case of when we'll be able to use the cache anyway. One exception is if we cache + // full partitions, in which case we just always read it all and cache. + if (cacheFullPartitions || clusteringIndexFilter().isHeadFilter()) + { + RowCacheSentinel sentinel = new RowCacheSentinel(); + boolean sentinelSuccess = CacheService.instance.rowCache.putIfAbsent(key, sentinel); + boolean sentinelReplaced = false; + + try + { + int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); + @SuppressWarnings("resource") // we close on exception or upon closing the result of this method + UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); + try + { + // We want to cache only rowsToCache rows + CachedPartition toCache = CachedBTreePartition.create(DataLimits.cqlLimits(rowsToCache).filter(iter, nowInSec()), nowInSec()); + if (sentinelSuccess && !toCache.isEmpty()) + { + Tracing.trace("Caching {} rows", toCache.rowCount()); + CacheService.instance.rowCache.replace(key, sentinel, toCache); + // Whether or not the previous replace has worked, our sentinel is not in the cache anymore + sentinelReplaced = true; + } + + // We then re-filter out what this query wants. + // Note that in the case where we don't cache full partitions, it's possible that the current query is interested in more + // than what we've cached, so we can't just use toCache. + UnfilteredRowIterator cacheIterator = clusteringIndexFilter().getUnfilteredRowIterator(columnFilter(), toCache); + if (cacheFullPartitions) + { + // Everything is guaranteed to be in 'toCache', we're done with 'iter' + assert !iter.hasNext(); + iter.close(); + return cacheIterator; + } + return UnfilteredRowIterators.concat(cacheIterator, clusteringIndexFilter().filterNotIndexed(columnFilter(), iter)); + } + catch (RuntimeException | Error e) + { + iter.close(); + throw e; + } + } + finally + { + if (sentinelSuccess && !sentinelReplaced) + cfs.invalidateCachedPartition(key); + } + } + + Tracing.trace("Fetching data but not populating cache as query does not query from the start of the partition"); + return queryMemtableAndDisk(cfs, readOp); + } + + /** + * Queries both memtable and sstables to fetch the result of this query. + * <p> + * Please note that this method: + * 1) does not check the row cache. + * 2) does not apply the query limit, nor the row filter (and so ignore 2ndary indexes). + * Those are applied in {@link ReadCommand#executeLocally}. + * 3) does not record some of the read metrics (latency, scanned cells histograms) nor + * throws TombstoneOverwhelmingException. + * It is publicly exposed because there is a few places where that is exactly what we want, + * but it should be used only where you know you don't need thoses things. + * <p> + * Also note that one must have "started" a {@code OpOrder.Group} on the queried table, and that is + * to enforce that that it is required as parameter, even though it's not explicitlly used by the method. + */ + public UnfilteredRowIterator queryMemtableAndDisk(ColumnFamilyStore cfs, OpOrder.Group readOp) + { + Tracing.trace("Executing single-partition query on {}", cfs.name); + + boolean copyOnHeap = Memtable.MEMORY_POOL.needToCopyOnHeap(); + return queryMemtableAndDiskInternal(cfs, copyOnHeap); + } + + protected abstract UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs, boolean copyOnHeap); + + @Override + public String toString() + { + return String.format("Read(%s.%s columns=%s rowFilter=%s limits=%s key=%s filter=%s, nowInSec=%d)", + metadata().ksName, + metadata().cfName, + columnFilter(), + rowFilter(), + limits(), + metadata().getKeyValidator().getString(partitionKey().getKey()), + clusteringIndexFilter.toString(metadata()), + nowInSec()); + } + + public MessageOut<ReadCommand> createMessage(int version) + { + return new MessageOut<>(MessagingService.Verb.READ, this, version < MessagingService.VERSION_30 ? legacyReadCommandSerializer : serializer); + } + + protected void appendCQLWhereClause(StringBuilder sb) + { + sb.append(" WHERE "); + + sb.append(ColumnDefinition.toCQLString(metadata().partitionKeyColumns())).append(" = "); + DataRange.appendKeyString(sb, metadata().getKeyValidator(), partitionKey().getKey()); + + // We put the row filter first because the clustering index filter can end by "ORDER BY" + if (!rowFilter().isEmpty()) + sb.append(" AND ").append(rowFilter()); + + String filterString = clusteringIndexFilter().toCQLString(metadata()); + if (!filterString.isEmpty()) + sb.append(" AND ").append(filterString); + } + + protected void serializeSelection(DataOutputPlus out, int version) throws IOException + { + metadata().getKeyValidator().writeValue(partitionKey().getKey(), out); + ClusteringIndexFilter.serializer.serialize(clusteringIndexFilter(), out, version); + } + + protected long selectionSerializedSize(int version) + { + return metadata().getKeyValidator().writtenLength(partitionKey().getKey()) + + ClusteringIndexFilter.serializer.serializedSize(clusteringIndexFilter(), version); + } + + /** + * Groups multiple single partition read commands. + */ + public static class Group implements ReadQuery + { + public final List<SinglePartitionReadCommand<?>> commands; + private final DataLimits limits; + private final int nowInSec; + + public Group(List<SinglePartitionReadCommand<?>> commands, DataLimits limits) + { + assert !commands.isEmpty(); + this.commands = commands; + this.limits = limits; + this.nowInSec = commands.get(0).nowInSec(); + for (int i = 1; i < commands.size(); i++) + assert commands.get(i).nowInSec() == nowInSec; + } + + public static Group one(SinglePartitionReadCommand<?> command) + { + return new Group(Collections.<SinglePartitionReadCommand<?>>singletonList(command), command.limits()); + } + + public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState) throws RequestExecutionException + { + return StorageProxy.read(this, consistency, clientState); + } + + public int nowInSec() + { + return nowInSec; + } + + public DataLimits limits() + { + return limits; + } + + public CFMetaData metadata() + { + return commands.get(0).metadata(); + } + + public ReadOrderGroup startOrderGroup() + { + // Note that the only difference between the command in a group must be the partition key on which + // they applied. So as far as ReadOrderGroup is concerned, we can use any of the commands to start one. + return commands.get(0).startOrderGroup(); + } + + public PartitionIterator executeInternal(ReadOrderGroup orderGroup) + { + List<PartitionIterator> partitions = new ArrayList<>(commands.size()); + for (SinglePartitionReadCommand cmd : commands) + partitions.add(cmd.executeInternal(orderGroup)); + + // Because we only have enforce the limit per command, we need to enforce it globally. + return limits.filter(PartitionIterators.concat(partitions), nowInSec); + } + + public QueryPager getPager(PagingState pagingState) + { + if (commands.size() == 1) + return SinglePartitionReadCommand.getPager(commands.get(0), pagingState); + + return new MultiPartitionPager(this, pagingState); + } + + @Override + public String toString() + { + return commands.toString(); + } + } + + private static class Deserializer extends SelectionDeserializer + { + public ReadCommand deserialize(DataInputPlus in, int version, boolean isDigest, int digestVersion, boolean isForThrift, CFMetaData metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits, Optional<IndexMetadata> index) + throws IOException + { + DecoratedKey key = metadata.decorateKey(metadata.getKeyValidator().readValue(in)); + ClusteringIndexFilter filter = ClusteringIndexFilter.serializer.deserialize(in, version, metadata); + if (filter instanceof ClusteringIndexNamesFilter) + return new SinglePartitionNamesCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexNamesFilter)filter); + else + return new SinglePartitionSliceCommand(isDigest, digestVersion, isForThrift, metadata, nowInSec, columnFilter, rowFilter, limits, key, (ClusteringIndexSliceFilter)filter); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index d8ff36a,ce12206..04cdc21 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@@ -1518,12 -1491,16 +1518,12 @@@ public abstract class SSTableReader ext public void cacheKey(DecoratedKey key, RowIndexEntry info) { - CachingOptions caching = metadata.getCaching(); + CachingParams caching = metadata.params.caching; - if (!caching.keyCache.isEnabled() - || keyCache == null - || keyCache.getCapacity() == 0) - { + if (!caching.cacheKeys() || keyCache == null || keyCache.getCapacity() == 0) return; - } - KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); + KeyCacheKey cacheKey = new KeyCacheKey(metadata.ksAndCFName, descriptor, key.getKey()); logger.trace("Adding cache entry for {} -> {}", cacheKey, info); keyCache.put(cacheKey, info); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/9218d745/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java ----------------------------------------------------------------------
