http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java index 0243b0d..add4445 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnClusteringKey.java @@ -18,15 +18,13 @@ package org.apache.cassandra.db.index.composites; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.utils.concurrent.OpOrder; /** @@ -48,67 +46,90 @@ import org.apache.cassandra.utils.concurrent.OpOrder; */ public class CompositesIndexOnClusteringKey extends CompositesIndex { - public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) + public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - // Index cell names are rk ck_0 ... ck_{i-1} ck_{i+1} ck_n, so n - // components total (where n is the number of clustering keys) - int ckCount = baseMetadata.clusteringColumns().size(); - List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount); - types.add(SecondaryIndex.keyComparator); + indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); + + List<ColumnDefinition> cks = baseMetadata.clusteringColumns(); for (int i = 0; i < columnDef.position(); i++) - types.add(baseMetadata.clusteringColumns().get(i).type); - for (int i = columnDef.position() + 1; i < ckCount; i++) - types.add(baseMetadata.clusteringColumns().get(i).type); - return new CompoundDenseCellNameType(types); + { + ColumnDefinition def = cks.get(i); + indexMetadata.addClusteringColumn(def.name, def.type); + } + for (int i = columnDef.position() + 1; i < cks.size(); i++) + { + ColumnDefinition def = cks.get(i); + indexMetadata.addClusteringColumn(def.name, def.type); + } } - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - return cell.name().get(columnDef.position()); + return clustering.get(columnDef.position()); } - protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName) + protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path) { - int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size()); - CBuilder builder = getIndexComparator().prefixBuilder(); + CBuilder builder = CBuilder.create(getIndexComparator()); builder.add(rowKey); - for (int i = 0; i < Math.min(columnDef.position(), count); i++) - builder.add(columnName.get(i)); - for (int i = columnDef.position() + 1; i < count; i++) - builder.add(columnName.get(i)); - return builder.build(); + for (int i = 0; i < Math.min(columnDef.position(), prefix.size()); i++) + builder.add(prefix.get(i)); + for (int i = columnDef.position() + 1; i < prefix.size(); i++) + builder.add(prefix.get(i)); + return builder; } - public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry) + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) { int ckCount = baseCfs.metadata.clusteringColumns().size(); - CBuilder builder = baseCfs.getComparator().builder(); + Clustering clustering = indexEntry.clustering(); + CBuilder builder = CBuilder.create(baseCfs.getComparator()); for (int i = 0; i < columnDef.position(); i++) - builder.add(indexEntry.name().get(i + 1)); + builder.add(clustering.get(i + 1)); builder.add(indexedValue.getKey()); for (int i = columnDef.position() + 1; i < ckCount; i++) - builder.add(indexEntry.name().get(i)); + builder.add(clustering.get(i)); - return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build()); + return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build()); } @Override - public boolean indexes(CellName name) + protected boolean indexPrimaryKeyColumn() { - // For now, assume this is only used in CQL3 when we know name has enough component. return true; } - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + @Override + public boolean indexes(ColumnDefinition c) + { + // Actual indexing for this index type is done through maybeIndex + return false; + } + + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) + { + return !data.hasLiveData(nowInSec); + } + + @Override + public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec) + { + if (clustering != Clustering.STATIC_CLUSTERING && clustering.get(columnDef.position()) != null) + insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup); + } + + @Override + public void maybeDelete(ByteBuffer partitionKey, Clustering clustering, DeletionTime deletion, OpOrder.Group opGroup) { - return data.hasOnlyTombstones(now); + if (clustering.get(columnDef.position()) != null && !deletion.isLive()) + delete(partitionKey, clustering, null, null, deletion, opGroup); } @Override - public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup) + public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec) { // We only know that one column of the CQL row has been updated/deleted, but we don't know if the // full row has been deleted so we should not do anything. If it ends up that the whole row has
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java index 1e40710..50e81c4 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKey.java @@ -21,7 +21,7 @@ import java.nio.ByteBuffer; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.*; /** @@ -38,22 +38,21 @@ public class CompositesIndexOnCollectionKey extends CompositesIndexIncludingColl return ((CollectionType)columnDef.type).nameComparator(); } - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - return cell.name().get(columnDef.position() + 1); + return path.get(0); } @Override public boolean supportsOperator(Operator operator) { return operator == Operator.CONTAINS_KEY || - operator == Operator.CONTAINS && columnDef.type instanceof SetType; + operator == Operator.CONTAINS && columnDef.type instanceof SetType; } - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexValue.getKey()); - Cell cell = data.getColumn(name); - return cell == null || !cell.isLive(now); + Cell cell = data.getCell(columnDef, CellPath.create(indexValue)); + return cell == null || !cell.isLive(nowInSec); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java index 0b7f579..766f803 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionKeyAndValue.java @@ -20,7 +20,7 @@ package org.apache.cassandra.db.index.composites; import java.nio.ByteBuffer; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.*; /** @@ -38,50 +38,22 @@ public class CompositesIndexOnCollectionKeyAndValue extends CompositesIndexInclu return CompositeType.getInstance(colType.nameComparator(), colType.valueComparator()); } - @Override - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) - { - final ByteBuffer key = cell.name().get(columnDef.position() + 1); - final ByteBuffer value = cell.value(); - return CompositeType.build(key, value); - } - - @Override - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - Cell cell = extractTargetCell(entry, data); - if (cellIsDead(cell, now)) - return true; - ByteBuffer indexCollectionValue = extractCollectionValue(entry); - ByteBuffer targetCollectionValue = cell.value(); - AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator(); - return valueComparator.compare(indexCollectionValue, targetCollectionValue) != 0; + return CompositeType.build(path.get(0), cellValue); } - private Cell extractTargetCell(IndexedEntry entry, ColumnFamily data) + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - ByteBuffer collectionKey = extractCollectionKey(entry); - CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, collectionKey); - return data.getColumn(name); - } + ByteBuffer[] components = ((CompositeType)getIndexKeyComparator()).split(indexValue); + ByteBuffer mapKey = components[0]; + ByteBuffer mapValue = components[1]; - private ByteBuffer extractCollectionKey(IndexedEntry entry) - { - return extractIndexKeyComponent(entry, 0); - } - - private ByteBuffer extractIndexKeyComponent(IndexedEntry entry, int component) - { - return CompositeType.extractComponent(entry.indexValue.getKey(), component); - } - - private ByteBuffer extractCollectionValue(IndexedEntry entry) - { - return extractIndexKeyComponent(entry, 1); - } + Cell cell = data.getCell(columnDef, CellPath.create(mapKey)); + if (cell == null || !cell.isLive(nowInSec)) + return true; - private boolean cellIsDead(Cell cell, long now) - { - return cell == null || !cell.isLive(now); + AbstractType<?> valueComparator = ((CollectionType)columnDef.type).valueComparator(); + return valueComparator.compare(mapValue, cell.value()) != 0; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java index a11a0d9..5af842c 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnCollectionValue.java @@ -18,19 +18,13 @@ package org.apache.cassandra.db.index.composites; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; +import java.util.Iterator; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.cql3.Operator; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CBuilder; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.CompoundDenseCellNameType; -import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.*; /** @@ -46,15 +40,12 @@ import org.apache.cassandra.db.marshal.*; */ public class CompositesIndexOnCollectionValue extends CompositesIndex { - public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) + public static void addClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition columnDef) { - int prefixSize = columnDef.position(); - List<AbstractType<?>> types = new ArrayList<>(prefixSize + 2); - types.add(SecondaryIndex.keyComparator); - for (int i = 0; i < prefixSize; i++) - types.add(baseMetadata.comparator.subtype(i)); - types.add(((CollectionType)columnDef.type).nameComparator()); // collection key - return new CompoundDenseCellNameType(types); + addGenericClusteringColumns(indexMetadata, baseMetadata, columnDef); + + // collection key + indexMetadata.addClusteringColumn("cell_path", ((CollectionType)columnDef.type).nameComparator()); } @Override @@ -63,36 +54,32 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex return ((CollectionType)columnDef.type).valueComparator(); } - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - return cell.value(); + return cellValue; } - protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName) + protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path) { - CBuilder builder = getIndexComparator().prefixBuilder(); + CBuilder builder = CBuilder.create(getIndexComparator()); builder.add(rowKey); - for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++) - builder.add(cellName.get(i)); + for (int i = 0; i < prefix.size(); i++) + builder.add(prefix.get(i)); + + // When indexing, cell will be present, but when searching, it won't (CASSANDRA-7525) + if (prefix.size() == baseCfs.metadata.clusteringColumns().size() && path != null) + builder.add(path.get(0)); - // When indexing, cellName is a full name including the collection - // key. When searching, restricted clustering columns are included - // but the collection key is not. In this case, don't try to add an - // element to the builder for it, as it will just end up null and - // error out when retrieving cells from the index cf (CASSANDRA-7525) - if (cellName.size() >= columnDef.position() + 1) - builder.add(cellName.get(columnDef.position() + 1)); - return builder.build(); + return builder; } - public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry) + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) { - int prefixSize = columnDef.position(); - CellName name = indexEntry.name(); - CBuilder builder = baseCfs.getComparator().builder(); - for (int i = 0; i < prefixSize; i++) - builder.add(name.get(i + 1)); - return new IndexedEntry(indexedValue, name, indexEntry.timestamp(), name.get(0), builder.build(), name.get(prefixSize + 1)); + Clustering clustering = indexEntry.clustering(); + CBuilder builder = CBuilder.create(baseCfs.getComparator()); + for (int i = 0; i < baseCfs.getComparator().size(); i++) + builder.add(clustering.get(i + 1)); + return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build()); } @Override @@ -101,18 +88,15 @@ public class CompositesIndexOnCollectionValue extends CompositesIndex return operator == Operator.CONTAINS && !(columnDef.type instanceof SetType); } - @Override - public boolean indexes(CellName name) - { - AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef); - return name.size() > columnDef.position() - && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0; - } - - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef, entry.indexedEntryCollectionKey); - Cell cell = data.getColumn(name); - return cell == null || !cell.isLive(now) || ((CollectionType) columnDef.type).valueComparator().compare(entry.indexValue.getKey(), cell.value()) != 0; + Iterator<Cell> iter = data.getCells(columnDef); + while (iter.hasNext()) + { + Cell cell = iter.next(); + if (cell.isLive(nowInSec) && ((CollectionType) columnDef.type).valueComparator().compare(indexValue, cell.value()) == 0) + return false; + } + return true; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java index df43057..d48e58b 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnPartitionKey.java @@ -18,14 +18,10 @@ package org.apache.cassandra.db.index.composites; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.utils.concurrent.OpOrder; @@ -49,57 +45,59 @@ import org.apache.cassandra.utils.concurrent.OpOrder; */ public class CompositesIndexOnPartitionKey extends CompositesIndex { - public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) - { - int ckCount = baseMetadata.clusteringColumns().size(); - List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(ckCount + 1); - types.add(SecondaryIndex.keyComparator); - for (int i = 0; i < ckCount; i++) - types.add(baseMetadata.comparator.subtype(i)); - return new CompoundDenseCellNameType(types); - } - - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { CompositeType keyComparator = (CompositeType)baseCfs.metadata.getKeyValidator(); ByteBuffer[] components = keyComparator.split(rowKey); return components[columnDef.position()]; } - protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite columnName) + protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path) { - int count = Math.min(baseCfs.metadata.clusteringColumns().size(), columnName.size()); - CBuilder builder = getIndexComparator().prefixBuilder(); + CBuilder builder = CBuilder.create(getIndexComparator()); builder.add(rowKey); - for (int i = 0; i < count; i++) - builder.add(columnName.get(i)); - return builder.build(); + for (int i = 0; i < prefix.size(); i++) + builder.add(prefix.get(i)); + return builder; } - public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry) + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) { int ckCount = baseCfs.metadata.clusteringColumns().size(); - CBuilder builder = baseCfs.getComparator().builder(); + Clustering clustering = indexEntry.clustering(); + CBuilder builder = CBuilder.create(baseCfs.getComparator()); for (int i = 0; i < ckCount; i++) - builder.add(indexEntry.name().get(i + 1)); + builder.add(clustering.get(i + 1)); - return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build()); + return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build()); } @Override - public boolean indexes(CellName name) + protected boolean indexPrimaryKeyColumn() { - // Since a partition key is always full, we always index it return true; } - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + @Override + public boolean indexes(ColumnDefinition c) + { + // Actual indexing for this index type is done through maybeIndex + return false; + } + + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) + { + return !data.hasLiveData(nowInSec); + } + + @Override + public void maybeIndex(ByteBuffer partitionKey, Clustering clustering, long timestamp, int ttl, OpOrder.Group opGroup, int nowInSec) { - return data.hasOnlyTombstones(now); + insert(partitionKey, clustering, null, SimpleLivenessInfo.forUpdate(timestamp, ttl, nowInSec, indexCfs.metadata), opGroup); } @Override - public void delete(ByteBuffer rowKey, Cell cell, OpOrder.Group opGroup) + public void delete(ByteBuffer rowKey, Clustering clustering, Cell cell, OpOrder.Group opGroup, int nowInSec) { // We only know that one column of the CQL row has been updated/deleted, but we don't know if the // full row has been deleted so we should not do anything. If it ends up that the whole row has http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java index b9dc07f..a88502a 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesIndexOnRegular.java @@ -18,15 +18,9 @@ package org.apache.cassandra.db.index.composites; import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.*; -import org.apache.cassandra.db.index.SecondaryIndex; -import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.db.rows.*; /** * Index on a REGULAR column definition on a composite type. @@ -47,50 +41,35 @@ import org.apache.cassandra.db.marshal.*; */ public class CompositesIndexOnRegular extends CompositesIndex { - public static CellNameType buildIndexComparator(CFMetaData baseMetadata, ColumnDefinition columnDef) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - int prefixSize = columnDef.position(); - List<AbstractType<?>> types = new ArrayList<AbstractType<?>>(prefixSize + 1); - types.add(SecondaryIndex.keyComparator); - for (int i = 0; i < prefixSize; i++) - types.add(baseMetadata.comparator.subtype(i)); - return new CompoundDenseCellNameType(types); + return cellValue; } - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path) { - return cell.value(); - } - - protected Composite makeIndexColumnPrefix(ByteBuffer rowKey, Composite cellName) - { - CBuilder builder = getIndexComparator().prefixBuilder(); + CBuilder builder = CBuilder.create(getIndexComparator()); builder.add(rowKey); - for (int i = 0; i < Math.min(columnDef.position(), cellName.size()); i++) - builder.add(cellName.get(i)); - return builder.build(); - } - - public IndexedEntry decodeEntry(DecoratedKey indexedValue, Cell indexEntry) - { - CBuilder builder = baseCfs.getComparator().builder(); - for (int i = 0; i < columnDef.position(); i++) - builder.add(indexEntry.name().get(i + 1)); - return new IndexedEntry(indexedValue, indexEntry.name(), indexEntry.timestamp(), indexEntry.name().get(0), builder.build()); + for (int i = 0; i < prefix.size(); i++) + builder.add(prefix.get(i)); + return builder; } - @Override - public boolean indexes(CellName name) + public IndexedEntry decodeEntry(DecoratedKey indexedValue, Row indexEntry) { - AbstractType<?> comp = baseCfs.metadata.getColumnDefinitionComparator(columnDef); - return name.size() > columnDef.position() - && comp.compare(name.get(columnDef.position()), columnDef.name.bytes) == 0; + Clustering clustering = indexEntry.clustering(); + ClusteringComparator baseComparator = baseCfs.getComparator(); + CBuilder builder = CBuilder.create(baseComparator); + for (int i = 0; i < baseComparator.size(); i++) + builder.add(clustering.get(i + 1)); + return new IndexedEntry(indexedValue, clustering, indexEntry.primaryKeyLivenessInfo().timestamp(), clustering.get(0), builder.build()); } - public boolean isStale(IndexedEntry entry, ColumnFamily data, long now) + public boolean isStale(Row data, ByteBuffer indexValue, int nowInSec) { - CellName name = data.getComparator().create(entry.indexedEntryPrefix, columnDef); - Cell cell = data.getColumn(name); - return cell == null || !cell.isLive(now) || columnDef.type.compare(entry.indexValue.getKey(), cell.value()) != 0; + Cell cell = data.getCell(columnDef); + return cell == null + || !cell.isLive(nowInSec) + || columnDef.type.compare(indexValue, cell.value()) != 0; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java index 88453df..f838ff1 100644 --- a/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java +++ b/src/java/org/apache/cassandra/db/index/composites/CompositesSearcher.java @@ -17,296 +17,224 @@ */ package org.apache.cassandra.db.index.composites; -import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.db.ArrayBackedSortedColumns; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.ColumnFamily; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.IndexExpression; -import org.apache.cassandra.db.Row; -import org.apache.cassandra.db.RowPosition; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.Composites; -import org.apache.cassandra.db.filter.ColumnSlice; -import org.apache.cassandra.db.filter.ExtendedFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.SliceQueryFilter; -import org.apache.cassandra.db.index.SecondaryIndexManager; -import org.apache.cassandra.db.index.SecondaryIndexSearcher; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.index.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.utils.concurrent.OpOrder; + public class CompositesSearcher extends SecondaryIndexSearcher { private static final Logger logger = LoggerFactory.getLogger(CompositesSearcher.class); - public CompositesSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns) + public CompositesSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns) { super(indexManager, columns); } - @Override - public List<Row> search(ExtendedFilter filter) + private boolean isMatchingEntry(DecoratedKey partitionKey, CompositesIndex.IndexedEntry entry, ReadCommand command) { - assert filter.getClause() != null && !filter.getClause().isEmpty(); - final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true); - final CompositesIndex index = (CompositesIndex)indexManager.getIndexForColumn(primary.column); - // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room - // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made - try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start()) - { - return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter); - } + return command.selects(partitionKey, entry.indexedEntryClustering); } - private Composite makePrefix(CompositesIndex index, ByteBuffer key, ExtendedFilter filter, boolean isStart) + protected UnfilteredPartitionIterator queryDataFromIndex(AbstractSimplePerColumnSecondaryIndex secondaryIdx, + final DecoratedKey indexKey, + final RowIterator indexHits, + final ReadCommand command, + final ReadOrderGroup orderGroup) { - if (key.remaining() == 0) - return Composites.EMPTY; + assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW; - Composite prefix; - IDiskAtomFilter columnFilter = filter.columnFilter(key); - if (columnFilter instanceof SliceQueryFilter) - { - SliceQueryFilter sqf = (SliceQueryFilter)columnFilter; - Composite columnName = isStart ? sqf.start() : sqf.finish(); - prefix = columnName.isEmpty() ? index.getIndexComparator().make(key) : index.makeIndexColumnPrefix(key, columnName); - } - else - { - prefix = index.getIndexComparator().make(key); - } - return isStart ? prefix.start() : prefix.end(); - } + assert secondaryIdx instanceof CompositesIndex; + final CompositesIndex index = (CompositesIndex)secondaryIdx; - private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final CompositesIndex index) - { - // Start with the most-restrictive indexed clause, then apply remaining clauses - // to each row matching that clause. - // TODO: allow merge join instead of just one index + loop - assert index != null; - assert index.getIndexCfs() != null; - final DecoratedKey indexKey = index.getIndexKeyFor(primary.value); - - if (logger.isDebugEnabled()) - logger.debug("Most-selective indexed predicate is {}", index.expressionString(primary)); - - /* - * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of - * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the smallest - * possible key having a given token. A fix would be to actually store the token along the key in the - * indexed row. - */ - final AbstractBounds<RowPosition> range = filter.dataRange.keyRange(); - ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER; - ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).getKey() : ByteBufferUtil.EMPTY_BYTE_BUFFER; - - final CellNameType baseComparator = baseCfs.getComparator(); - final CellNameType indexComparator = index.getIndexCfs().getComparator(); - - final Composite startPrefix = makePrefix(index, startKey, filter, true); - final Composite endPrefix = makePrefix(index, endKey, filter, false); - - return new ColumnFamilyStore.AbstractScanIterator() + return new UnfilteredPartitionIterator() { - private Composite lastSeenPrefix = startPrefix; - private Deque<Cell> indexCells; - private int columnsRead = Integer.MAX_VALUE; - private int limit = filter.currentLimit(); - private int columnsCount = 0; + private CompositesIndex.IndexedEntry nextEntry; - // We have to fetch at least two rows to avoid breaking paging if the first row doesn't satisfy all clauses - private int indexCellsPerQuery = Math.max(2, Math.min(filter.maxColumns(), filter.maxRows())); + private UnfilteredRowIterator next; - public boolean needsFiltering() + public boolean isForThrift() { - return false; + return command.isForThrift(); + } + + public boolean hasNext() + { + return prepareNext(); } - private Row makeReturn(DecoratedKey key, ColumnFamily data) + public UnfilteredRowIterator next() { - if (data == null) - return endOfData(); + if (next == null) + prepareNext(); - assert key != null; - return new Row(key, data); + UnfilteredRowIterator toReturn = next; + next = null; + return toReturn; } - protected Row computeNext() + private boolean prepareNext() { - /* - * Our internal index code is wired toward internal rows. So we need to accumulate all results for a given - * row before returning from this method. Which unfortunately means that this method has to do what - * CFS.filter does for KeysIndex. - */ - DecoratedKey currentKey = null; - ColumnFamily data = null; - Composite previousPrefix = null; - - while (true) - { - // Did we get more columns that needed to respect the user limit? - // (but we still need to return what has been fetched already) - if (columnsCount >= limit) - return makeReturn(currentKey, data); + if (next != null) + return true; - if (indexCells == null || indexCells.isEmpty()) + if (nextEntry == null) + { + if (!indexHits.hasNext()) + return false; + + nextEntry = index.decodeEntry(indexKey, indexHits.next()); + } + + // Gather all index hits belonging to the same partition and query the data for those hits. + // TODO: it's much more efficient to do 1 read for all hits to the same partition than doing + // 1 read per index hit. However, this basically mean materializing all hits for a partition + // in memory so we should consider adding some paging mechanism. However, index hits should + // be relatively small so it's much better than the previous code that was materializing all + // *data* for a given partition. + NavigableSet<Clustering> clusterings = new TreeSet<>(baseCfs.getComparator()); + List<CompositesIndex.IndexedEntry> entries = new ArrayList<>(); + DecoratedKey partitionKey = baseCfs.partitioner.decorateKey(nextEntry.indexedKey); + + while (nextEntry != null && partitionKey.getKey().equals(nextEntry.indexedKey)) + { + // We're queried a slice of the index, but some hits may not match some of the clustering column constraints + if (isMatchingEntry(partitionKey, nextEntry, command)) { - if (columnsRead < indexCellsPerQuery) - { - logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, indexCellsPerQuery); - return makeReturn(currentKey, data); - } - - if (logger.isTraceEnabled()) - logger.trace("Scanning index {} starting with {}", - index.expressionString(primary), indexComparator.getString(startPrefix)); - - QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey, - index.getIndexCfs().name, - lastSeenPrefix, - endPrefix, - false, - indexCellsPerQuery, - filter.timestamp); - ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter); - if (indexRow == null || !indexRow.hasColumns()) - return makeReturn(currentKey, data); - - Collection<Cell> sortedCells = indexRow.getSortedColumns(); - columnsRead = sortedCells.size(); - indexCells = new ArrayDeque<>(sortedCells); - Cell firstCell = sortedCells.iterator().next(); - - // Paging is racy, so it is possible the first column of a page is not the last seen one. - if (lastSeenPrefix != startPrefix && lastSeenPrefix.equals(firstCell.name())) - { - // skip the row we already saw w/ the last page of results - indexCells.poll(); - logger.trace("Skipping {}", indexComparator.getString(firstCell.name())); - } + clusterings.add(nextEntry.indexedEntryClustering); + entries.add(nextEntry); } - while (!indexCells.isEmpty() && columnsCount <= limit) - { - Cell cell = indexCells.poll(); - lastSeenPrefix = cell.name(); - if (!cell.isLive(filter.timestamp)) - { - logger.trace("skipping {}", cell.name()); - continue; - } - - CompositesIndex.IndexedEntry entry = index.decodeEntry(indexKey, cell); - DecoratedKey dk = baseCfs.partitioner.decorateKey(entry.indexedKey); - - // Are we done for this row? - if (currentKey == null) - { - currentKey = dk; - } - else if (!currentKey.equals(dk)) - { - DecoratedKey previousKey = currentKey; - currentKey = dk; - previousPrefix = null; - - // We're done with the previous row, return it if it had data, continue otherwise - indexCells.addFirst(cell); - if (data == null) - continue; - else - return makeReturn(previousKey, data); - } - - if (!range.contains(dk)) - { - // Either we're not yet in the range cause the range is start excluding, or we're - // past it. - if (!range.right.isMinimum() && range.right.compareTo(dk) < 0) - { - logger.trace("Reached end of assigned scan range"); - return endOfData(); - } - else - { - logger.debug("Skipping entry {} before assigned scan range", dk.getToken()); - continue; - } - } - - // Check if this entry cannot be a hit due to the original cell filter - Composite start = entry.indexedEntryPrefix; - if (!filter.columnFilter(dk.getKey()).maySelectPrefix(baseComparator, start)) - continue; - - // If we've record the previous prefix, it means we're dealing with an index on the collection value. In - // that case, we can have multiple index prefix for the same CQL3 row. In that case, we want to only add - // the CQL3 row once (because requesting the data multiple time would be inefficient but more importantly - // because we shouldn't count the columns multiple times with the lastCounted() call at the end of this - // method). - if (previousPrefix != null && previousPrefix.equals(start)) - continue; - else - previousPrefix = null; - - if (logger.isTraceEnabled()) - logger.trace("Adding index hit to current row for {}", indexComparator.getString(cell.name())); - - // We always query the whole CQL3 row. In the case where the original filter was a name filter this might be - // slightly wasteful, but this probably doesn't matter in practice and it simplify things. - ColumnSlice dataSlice = new ColumnSlice(start, entry.indexedEntryPrefix.end()); - // If the table has static columns, we must fetch them too as they may need to be returned too. - // Note that this is potentially wasteful for 2 reasons: - // 1) we will retrieve the static parts for each indexed row, even if we have more than one row in - // the same partition. If we were to group data queries to rows on the same slice, which would - // speed up things in general, we would also optimize here since we would fetch static columns only - // once for each group. - // 2) at this point we don't know if the user asked for static columns or not, so we might be fetching - // them for nothing. We would however need to ship the list of "CQL3 columns selected" with getRangeSlice - // to be able to know that. - // TODO: we should improve both point above - ColumnSlice[] slices = baseCfs.metadata.hasStaticColumns() - ? new ColumnSlice[]{ baseCfs.metadata.comparator.staticPrefix().slice(), dataSlice } - : new ColumnSlice[]{ dataSlice }; - SliceQueryFilter dataFilter = new SliceQueryFilter(slices, false, Integer.MAX_VALUE, baseCfs.metadata.clusteringColumns().size()); - ColumnFamily newData = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, dataFilter, filter.timestamp)); - if (newData == null || index.isStale(entry, newData, filter.timestamp)) - { - index.delete(entry, writeOp); - continue; - } - - assert newData != null : "An entry with no data should have been considered stale"; - - // We know the entry is not stale and so the entry satisfy the primary clause. So whether - // or not the data satisfies the other clauses, there will be no point to re-check the - // same CQL3 row if we run into another collection value entry for this row. - if (entry.indexedEntryCollectionKey != null) - previousPrefix = start; - - if (!filter.isSatisfiedBy(dk, newData, entry.indexedEntryPrefix, entry.indexedEntryCollectionKey)) - continue; - - if (data == null) - data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata); - data.addAll(newData); - columnsCount += dataFilter.lastCounted(); - } - } - } + nextEntry = indexHits.hasNext() ? index.decodeEntry(indexKey, indexHits.next()) : null; + } + + // Because we've eliminated entries that don't match the clustering columns, it's possible we added nothing + if (clusterings.isEmpty()) + return prepareNext(); + + // Query the gathered index hits. We still need to filter stale hits from the resulting query. + ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(clusterings, false); + SinglePartitionReadCommand dataCmd = new SinglePartitionNamesCommand(baseCfs.metadata, + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + DataLimits.NONE, + partitionKey, + filter); + @SuppressWarnings("resource") // We close right away if empty, and if it's assign to next it will be called either + // by the next caller of next, or through closing this iterator is this come before. + UnfilteredRowIterator dataIter = filterStaleEntries(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()), + index, + indexKey.getKey(), + entries, + orderGroup.writeOpOrderGroup(), + command.nowInSec()); + if (dataIter.isEmpty()) + { + dataIter.close(); + return prepareNext(); + } + + next = dataIter; + return true; + } - public void close() throws IOException {} + public void remove() + { + throw new UnsupportedOperationException(); + } + + public void close() + { + indexHits.close(); + if (next != null) + next.close(); + } + }; + } + + private UnfilteredRowIterator filterStaleEntries(UnfilteredRowIterator dataIter, + final CompositesIndex index, + final ByteBuffer indexValue, + final List<CompositesIndex.IndexedEntry> entries, + final OpOrder.Group writeOp, + final int nowInSec) + { + return new WrappingUnfilteredRowIterator(dataIter) + { + private int entriesIdx; + private Unfiltered next; + + @Override + public boolean hasNext() + { + return prepareNext(); + } + + @Override + public Unfiltered next() + { + if (next == null) + prepareNext(); + + Unfiltered toReturn = next; + next = null; + return toReturn; + } + + private boolean prepareNext() + { + if (next != null) + return true; + + while (next == null && super.hasNext()) + { + next = super.next(); + if (next.kind() != Unfiltered.Kind.ROW) + return true; + + Row row = (Row)next; + CompositesIndex.IndexedEntry entry = findEntry(row.clustering(), writeOp, nowInSec); + if (!index.isStale(row, indexValue, nowInSec)) + return true; + + // The entry is stale: delete the entry and ignore otherwise + index.delete(entry, writeOp, nowInSec); + next = null; + } + return false; + } + + private CompositesIndex.IndexedEntry findEntry(Clustering clustering, OpOrder.Group writeOp, int nowInSec) + { + assert entriesIdx < entries.size(); + while (entriesIdx < entries.size()) + { + CompositesIndex.IndexedEntry entry = entries.get(entriesIdx++); + // The entries are in clustering order. So that the requested entry should be the + // next entry, the one at 'entriesIdx'. However, we can have stale entries, entries + // that have no corresponding row in the base table typically because of a range + // tombstone or partition level deletion. Delete such stale entries. + int cmp = metadata().comparator.compare(entry.indexedEntryClustering, clustering); + assert cmp <= 0; // this would means entries are not in clustering order, which shouldn't happen + if (cmp == 0) + return entry; + else + index.delete(entry, writeOp, nowInSec); + } + // entries correspond to the rows we've queried, so we shouldn't have a row that has no corresponding entry. + throw new AssertionError(); + } }; } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java index e771d99..7930bd6 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysIndex.java @@ -20,14 +20,15 @@ package org.apache.cassandra.db.index.keys; import java.nio.ByteBuffer; import java.util.Set; -import org.apache.cassandra.db.Cell; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNames; -import org.apache.cassandra.db.ColumnFamily; +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.index.SecondaryIndex; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.index.AbstractSimplePerColumnSecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexSearcher; -import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.utils.concurrent.OpOrder; /** * Implements a secondary index for a column family using a second column family. @@ -39,41 +40,51 @@ import org.apache.cassandra.exceptions.ConfigurationException; */ public class KeysIndex extends AbstractSimplePerColumnSecondaryIndex { - protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Cell cell) + public static void addIndexClusteringColumns(CFMetaData.Builder indexMetadata, CFMetaData baseMetadata, ColumnDefinition cfDef) { - return cell.value(); + indexMetadata.addClusteringColumn("partition_key", SecondaryIndex.keyComparator); } - protected CellName makeIndexColumnName(ByteBuffer rowKey, Cell cell) + @Override + public void indexRow(DecoratedKey key, Row row, OpOrder.Group opGroup, int nowInSec) { - return CellNames.simpleDense(rowKey); - } + super.indexRow(key, row, opGroup, nowInSec); - public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ByteBuffer> columns) - { - return new KeysSearcher(baseCfs.indexManager, columns); + // This is used when building indexes, in particular when the index is first created. On thrift, this + // potentially means the column definition just got created, and so we need to check if's not a "dynamic" + // row that actually correspond to the index definition. + assert baseCfs.metadata.isCompactTable(); + if (!row.isStatic()) + { + Clustering clustering = row.clustering(); + if (clustering.get(0).equals(columnDef.name.bytes)) + { + Cell cell = row.getCell(baseCfs.metadata.compactValueColumn()); + if (cell != null && cell.isLive(nowInSec)) + insert(key.getKey(), clustering, cell, opGroup); + } + } } - public boolean isIndexEntryStale(ByteBuffer indexedValue, ColumnFamily data, long now) + protected ByteBuffer getIndexedValue(ByteBuffer rowKey, Clustering clustering, ByteBuffer cellValue, CellPath path) { - Cell cell = data.getColumn(data.getComparator().makeCellName(columnDef.name.bytes)); - return cell == null || !cell.isLive(now) || columnDef.type.compare(indexedValue, cell.value()) != 0; + return cellValue; } - public void validateOptions() throws ConfigurationException + protected CBuilder buildIndexClusteringPrefix(ByteBuffer rowKey, ClusteringPrefix prefix, CellPath path) { - // no options used + CBuilder builder = CBuilder.create(getIndexComparator()); + builder.add(rowKey); + return builder; } - public boolean indexes(CellName name) + public SecondaryIndexSearcher createSecondaryIndexSearcher(Set<ColumnDefinition> columns) { - // This consider the full cellName directly - AbstractType<?> comparator = baseCfs.metadata.getColumnDefinitionComparator(columnDef); - return comparator.compare(columnDef.name.bytes, name.toByteBuffer()) == 0; + return new KeysSearcher(baseCfs.indexManager, columns); } - protected AbstractType getExpressionComparator() + public void validateOptions() throws ConfigurationException { - return baseCfs.getComparator().asAbstractType(); + // no options used } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index b4fd0ba..6b53640 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -17,190 +17,169 @@ */ package org.apache.cassandra.db.index.keys; -import java.io.IOException; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Iterator; -import java.util.List; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.composites.CellName; -import org.apache.cassandra.db.composites.CellNameType; -import org.apache.cassandra.db.composites.Composite; -import org.apache.cassandra.db.composites.Composites; -import org.apache.cassandra.db.filter.ExtendedFilter; -import org.apache.cassandra.db.filter.IDiskAtomFilter; -import org.apache.cassandra.db.filter.QueryFilter; +import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.index.*; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.Range; +import org.apache.cassandra.db.filter.*; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.utils.concurrent.OpOrder; public class KeysSearcher extends SecondaryIndexSearcher { private static final Logger logger = LoggerFactory.getLogger(KeysSearcher.class); - public KeysSearcher(SecondaryIndexManager indexManager, Set<ByteBuffer> columns) + public KeysSearcher(SecondaryIndexManager indexManager, Set<ColumnDefinition> columns) { super(indexManager, columns); } - @Override - public List<Row> search(ExtendedFilter filter) + protected UnfilteredPartitionIterator queryDataFromIndex(final AbstractSimplePerColumnSecondaryIndex index, + final DecoratedKey indexKey, + final RowIterator indexHits, + final ReadCommand command, + final ReadOrderGroup orderGroup) { - assert filter.getClause() != null && !filter.getClause().isEmpty(); - final IndexExpression primary = highestSelectivityPredicate(filter.getClause(), true); - final SecondaryIndex index = indexManager.getIndexForColumn(primary.column); - // TODO: this should perhaps not open and maintain a writeOp for the full duration, but instead only *try* to delete stale entries, without blocking if there's no room - // as it stands, we open a writeOp and keep it open for the duration to ensure that should this CF get flushed to make room we don't block the reclamation of any room being made - try (OpOrder.Group writeOp = baseCfs.keyspace.writeOrder.start(); OpOrder.Group baseOp = baseCfs.readOrdering.start(); OpOrder.Group indexOp = index.getIndexCfs().readOrdering.start()) + assert indexHits.staticRow() == Rows.EMPTY_STATIC_ROW; + + return new UnfilteredPartitionIterator() { - return baseCfs.filter(getIndexedIterator(writeOp, filter, primary, index), filter); - } - } + private UnfilteredRowIterator next; - private ColumnFamilyStore.AbstractScanIterator getIndexedIterator(final OpOrder.Group writeOp, final ExtendedFilter filter, final IndexExpression primary, final SecondaryIndex index) - { + public boolean isForThrift() + { + return command.isForThrift(); + } - // Start with the most-restrictive indexed clause, then apply remaining clauses - // to each row matching that clause. - // TODO: allow merge join instead of just one index + loop - assert index != null; - assert index.getIndexCfs() != null; - final DecoratedKey indexKey = index.getIndexKeyFor(primary.value); - - if (logger.isDebugEnabled()) - logger.debug("Most-selective indexed predicate is {}", - ((AbstractSimplePerColumnSecondaryIndex) index).expressionString(primary)); - - /* - * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of - * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small - * possible key having a given token. A fix would be to actually store the token along the key in the - * indexed row. - */ - final AbstractBounds<RowPosition> range = filter.dataRange.keyRange(); - CellNameType type = index.getIndexCfs().getComparator(); - final Composite startKey = range.left instanceof DecoratedKey ? type.make(((DecoratedKey)range.left).getKey()) : Composites.EMPTY; - final Composite endKey = range.right instanceof DecoratedKey ? type.make(((DecoratedKey)range.right).getKey()) : Composites.EMPTY; - - final CellName primaryColumn = baseCfs.getComparator().cellFromByteBuffer(primary.column); - - return new ColumnFamilyStore.AbstractScanIterator() - { - private Composite lastSeenKey = startKey; - private Iterator<Cell> indexColumns; - private int columnsRead = Integer.MAX_VALUE; + public boolean hasNext() + { + return prepareNext(); + } - protected Row computeNext() + public UnfilteredRowIterator next() { - int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); - // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses - int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); - while (true) + if (next == null) + prepareNext(); + + UnfilteredRowIterator toReturn = next; + next = null; + return toReturn; + } + + private boolean prepareNext() + { + while (next == null && indexHits.hasNext()) { - if (indexColumns == null || !indexColumns.hasNext()) + Row hit = indexHits.next(); + DecoratedKey key = baseCfs.partitioner.decorateKey(hit.clustering().get(0)); + + SinglePartitionReadCommand dataCmd = SinglePartitionReadCommand.create(isForThrift(), + baseCfs.metadata, + command.nowInSec(), + command.columnFilter(), + command.rowFilter(), + DataLimits.NONE, + key, + command.clusteringIndexFilter(key)); + @SuppressWarnings("resource") // filterIfStale closes it's iterator if either it materialize it or if it returns null. + // Otherwise, we close right away if empty, and if it's assigned to next it will be called either + // by the next caller of next, or through closing this iterator is this come before. + UnfilteredRowIterator dataIter = filterIfStale(dataCmd.queryMemtableAndDisk(baseCfs, orderGroup.baseReadOpOrderGroup()), + index, + hit, + indexKey.getKey(), + orderGroup.writeOpOrderGroup(), + isForThrift(), + command.nowInSec()); + + if (dataIter != null) { - if (columnsRead < rowsPerQuery) - { - logger.trace("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); - return endOfData(); - } - - if (logger.isTraceEnabled() && (index instanceof AbstractSimplePerColumnSecondaryIndex)) - logger.trace("Scanning index {} starting with {}", - ((AbstractSimplePerColumnSecondaryIndex)index).expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey.toByteBuffer())); - - QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey, - index.getIndexCfs().name, - lastSeenKey, - endKey, - false, - rowsPerQuery, - filter.timestamp); - ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter); - logger.trace("fetched {}", indexRow); - if (indexRow == null) - { - logger.trace("no data, all done"); - return endOfData(); - } - - Collection<Cell> sortedCells = indexRow.getSortedColumns(); - columnsRead = sortedCells.size(); - indexColumns = sortedCells.iterator(); - Cell firstCell = sortedCells.iterator().next(); - - // Paging is racy, so it is possible the first column of a page is not the last seen one. - if (lastSeenKey != startKey && lastSeenKey.equals(firstCell.name())) - { - // skip the row we already saw w/ the last page of results - indexColumns.next(); - logger.trace("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstCell.name().toByteBuffer())); - } - else if (range instanceof Range && indexColumns.hasNext() && firstCell.name().equals(startKey)) - { - // skip key excluded by range - indexColumns.next(); - logger.trace("Skipping first key as range excludes it"); - } + if (dataIter.isEmpty()) + dataIter.close(); + else + next = dataIter; } + } + return next != null; + } - while (indexColumns.hasNext()) - { - Cell cell = indexColumns.next(); - lastSeenKey = cell.name(); - if (!cell.isLive(filter.timestamp)) - { - logger.trace("skipping {}", cell.name()); - continue; - } - - DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey.toByteBuffer()); - if (!range.right.isMinimum() && range.right.compareTo(dk) < 0) - { - logger.trace("Reached end of assigned scan range"); - return endOfData(); - } - if (!range.contains(dk)) - { - logger.trace("Skipping entry {} outside of assigned scan range", dk.getToken()); - continue; - } - - logger.trace("Returning index hit for {}", dk); - ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, filter.columnFilter(lastSeenKey.toByteBuffer()), filter.timestamp)); - // While the column family we'll get in the end should contains the primary clause cell, the initialFilter may not have found it and can thus be null - if (data == null) - data = ArrayBackedSortedColumns.factory.create(baseCfs.metadata); - - // as in CFS.filter - extend the filter to ensure we include the columns - // from the index expressions, just in case they weren't included in the initialFilter - IDiskAtomFilter extraFilter = filter.getExtraFilter(dk, data); - if (extraFilter != null) - { - ColumnFamily cf = baseCfs.getColumnFamily(new QueryFilter(dk, baseCfs.name, extraFilter, filter.timestamp)); - if (cf != null) - data.addAll(cf); - } - - if (((KeysIndex)index).isIndexEntryStale(indexKey.getKey(), data, filter.timestamp)) - { - // delete the index entry w/ its own timestamp - Cell dummyCell = new BufferCell(primaryColumn, indexKey.getKey(), cell.timestamp()); - ((PerColumnSecondaryIndex)index).delete(dk.getKey(), dummyCell, writeOp); - continue; - } - return new Row(dk, data); - } - } - } + public void remove() + { + throw new UnsupportedOperationException(); + } - public void close() throws IOException {} + public void close() + { + indexHits.close(); + if (next != null) + next.close(); + } }; } + + private UnfilteredRowIterator filterIfStale(UnfilteredRowIterator iterator, + AbstractSimplePerColumnSecondaryIndex index, + Row indexHit, + ByteBuffer indexedValue, + OpOrder.Group writeOp, + boolean isForThrift, + int nowInSec) + { + if (isForThrift) + { + // The data we got has gone though ThrifResultsMerger, so we're looking for the row whose clustering + // is the indexed name. Ans so we need to materialize the partition. + ArrayBackedPartition result = ArrayBackedPartition.create(iterator); + iterator.close(); + Row data = result.getRow(new SimpleClustering(index.indexedColumn().name.bytes)); + Cell cell = data == null ? null : data.getCell(baseCfs.metadata.compactValueColumn()); + return deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec) + ? null + : result.unfilteredIterator(); + } + else + { + assert iterator.metadata().isCompactTable(); + Row data = iterator.staticRow(); + Cell cell = data == null ? null : data.getCell(index.indexedColumn()); + if (deleteIfStale(iterator.partitionKey(), cell, index, indexHit, indexedValue, writeOp, nowInSec)) + { + iterator.close(); + return null; + } + else + { + return iterator; + } + } + } + + private boolean deleteIfStale(DecoratedKey partitionKey, + Cell cell, + AbstractSimplePerColumnSecondaryIndex index, + Row indexHit, + ByteBuffer indexedValue, + OpOrder.Group writeOp, + int nowInSec) + { + if (cell == null || !cell.isLive(nowInSec) || index.indexedColumn().type.compare(indexedValue, cell.value()) != 0) + { + // Index is stale, remove the index entry and ignore + index.delete(partitionKey.getKey(), + new SimpleClustering(index.indexedColumn().name.bytes), + indexedValue, + null, + new SimpleDeletionTime(indexHit.primaryKeyLivenessInfo().timestamp(), nowInSec), + writeOp); + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java index ff2abcb..52ac227 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java +++ b/src/java/org/apache/cassandra/db/lifecycle/SSTableIntervalTree.java @@ -6,16 +6,16 @@ import java.util.List; import com.google.common.collect.Iterables; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Interval; import org.apache.cassandra.utils.IntervalTree; -public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader, Interval<RowPosition, SSTableReader>> +public class SSTableIntervalTree extends IntervalTree<PartitionPosition, SSTableReader, Interval<PartitionPosition, SSTableReader>> { private static final SSTableIntervalTree EMPTY = new SSTableIntervalTree(null); - SSTableIntervalTree(Collection<Interval<RowPosition, SSTableReader>> intervals) + SSTableIntervalTree(Collection<Interval<PartitionPosition, SSTableReader>> intervals) { super(intervals); } @@ -30,11 +30,11 @@ public class SSTableIntervalTree extends IntervalTree<RowPosition, SSTableReader return new SSTableIntervalTree(buildIntervals(sstables)); } - public static List<Interval<RowPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) + public static List<Interval<PartitionPosition, SSTableReader>> buildIntervals(Iterable<SSTableReader> sstables) { - List<Interval<RowPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); + List<Interval<PartitionPosition, SSTableReader>> intervals = new ArrayList<>(Iterables.size(sstables)); for (SSTableReader sstable : sstables) - intervals.add(Interval.<RowPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); + intervals.add(Interval.<PartitionPosition, SSTableReader>create(sstable.first, sstable.last, sstable)); return intervals; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/lifecycle/View.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/lifecycle/View.java b/src/java/org/apache/cassandra/db/lifecycle/View.java index 0d1100b..f710dda 100644 --- a/src/java/org/apache/cassandra/db/lifecycle/View.java +++ b/src/java/org/apache/cassandra/db/lifecycle/View.java @@ -25,7 +25,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.*; import org.apache.cassandra.db.Memtable; -import org.apache.cassandra.db.RowPosition; +import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.utils.Interval; @@ -126,12 +126,12 @@ public class View return String.format("View(pending_count=%d, sstables=%s, compacting=%s)", liveMemtables.size() + flushingMemtables.size() - 1, sstables, compacting); } - public List<SSTableReader> sstablesInBounds(AbstractBounds<RowPosition> rowBounds) + public List<SSTableReader> sstablesInBounds(AbstractBounds<PartitionPosition> rowBounds) { if (intervalTree.isEmpty()) return Collections.emptyList(); - RowPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; - return intervalTree.search(Interval.<RowPosition, SSTableReader>create(rowBounds.left, stopInTree)); + PartitionPosition stopInTree = rowBounds.right.isMinimum() ? intervalTree.max() : rowBounds.right; + return intervalTree.search(Interval.<PartitionPosition, SSTableReader>create(rowBounds.left, stopInTree)); } // METHODS TO CONSTRUCT FUNCTIONS FOR MODIFYING A VIEW: http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/AbstractType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/AbstractType.java b/src/java/org/apache/cassandra/db/marshal/AbstractType.java index aa25a81..b074b34 100644 --- a/src/java/org/apache/cassandra/db/marshal/AbstractType.java +++ b/src/java/org/apache/cassandra/db/marshal/AbstractType.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.db.marshal; +import java.io.DataInput; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; @@ -27,11 +29,15 @@ import java.util.Map; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.Term; +import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.serializers.TypeSerializer; import org.apache.cassandra.serializers.MarshalException; import org.github.jamm.Unmetered; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.utils.ByteBufferUtil; /** * Specifies a Comparator for a specific type of ByteBuffer. @@ -87,6 +93,9 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> /** get a string representation of the bytes suitable for log messages */ public String getString(ByteBuffer bytes) { + if (bytes == null) + return "null"; + TypeSerializer<T> serializer = getSerializer(); serializer.validate(bytes); @@ -132,6 +141,17 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> return new CQL3Type.Custom(this); } + /** + * Same as compare except that this ignore ReversedType. This is to be use when + * comparing 2 values to decide for a CQL condition (see Operator.isSatisfiedBy) as + * for CQL, ReversedType is simply an "hint" to the storage engine but it does not + * change the meaning of queries per-se. + */ + public int compareForCQL(ByteBuffer v1, ByteBuffer v2) + { + return compare(v1, v2); + } + public abstract TypeSerializer<T> getSerializer(); /* convenience method */ @@ -291,6 +311,50 @@ public abstract class AbstractType<T> implements Comparator<ByteBuffer> } /** + * The length of values for this type if all values are of fixed length, -1 otherwise. + */ + protected int valueLengthIfFixed() + { + return -1; + } + + // This assumes that no empty values are passed + public void writeValue(ByteBuffer value, DataOutputPlus out) throws IOException + { + assert value.hasRemaining(); + if (valueLengthIfFixed() >= 0) + out.write(value); + else + ByteBufferUtil.writeWithLength(value, out); + } + + public long writtenLength(ByteBuffer value, TypeSizes sizes) + { + assert value.hasRemaining(); + return valueLengthIfFixed() >= 0 + ? value.remaining() + : sizes.sizeofWithLength(value); + } + + public ByteBuffer readValue(DataInput in) throws IOException + { + int length = valueLengthIfFixed(); + if (length >= 0) + return ByteBufferUtil.read(in, length); + else + return ByteBufferUtil.readWithLength(in); + } + + public void skipValue(DataInput in) throws IOException + { + int length = valueLengthIfFixed(); + if (length < 0) + length = in.readInt(); + + FileUtils.skipBytesFully(in, length); + } + + /** * This must be overriden by subclasses if necessary so that for any * AbstractType, this == TypeParser.parse(toString()). * http://git-wip-us.apache.org/repos/asf/cassandra/blob/a991b648/src/java/org/apache/cassandra/db/marshal/BooleanType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/BooleanType.java b/src/java/org/apache/cassandra/db/marshal/BooleanType.java index bfe8c34..f87eb12 100644 --- a/src/java/org/apache/cassandra/db/marshal/BooleanType.java +++ b/src/java/org/apache/cassandra/db/marshal/BooleanType.java @@ -94,4 +94,10 @@ public class BooleanType extends AbstractType<Boolean> { return BooleanSerializer.instance; } + + @Override + protected int valueLengthIfFixed() + { + return 1; + } }
