Revert "fix KEYS index from skipping results" Needs test case + backport to 1.0.9
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e2706a7 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e2706a7 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e2706a7 Branch: refs/heads/cassandra-1.1 Commit: 0e2706a7f1eec0ba161177002e6e25bb58678607 Parents: fe98003 Author: Jonathan Ellis <[email protected]> Authored: Mon Mar 19 10:18:01 2012 -0500 Committer: Jonathan Ellis <[email protected]> Committed: Mon Mar 19 10:18:01 2012 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 - .../db/index/MultiRowIndexSearcherIterator.java | 225 --------------- .../cassandra/db/index/keys/KeysSearcher.java | 136 ++++++++-- 3 files changed, 115 insertions(+), 247 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ddeb4c4..cb7a9c6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,6 @@ 1.1.1-dev * optimize commitlog checksumming (CASSANDRA-3610) * identify and blacklist corrupted SSTables from future compactions (CASSANDRA-2261) - * fix KEYS index from skipping results (CASSANDRA-3996) 1.1-dev http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java b/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java deleted file mode 100644 index ba7a023..0000000 --- a/src/java/org/apache/cassandra/db/index/MultiRowIndexSearcherIterator.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.db.index; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.Collections; -import java.util.Iterator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.db.filter.ExtendedFilter; -import org.apache.cassandra.db.filter.QueryFilter; -import org.apache.cassandra.db.filter.QueryPath; -import org.apache.cassandra.dht.AbstractBounds; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.thrift.IndexExpression; -import org.apache.cassandra.utils.ByteBufferUtil; - - -/** - * This class is a general searcher that visits rows returned by nextIndexKey(); - */ -public abstract class MultiRowIndexSearcherIterator extends ColumnFamilyStore.AbstractScanIterator -{ - private static final Logger logger = LoggerFactory.getLogger(MultiRowIndexSearcherIterator.class); - private static final Iterator<IColumn> EMPTY_ITERATOR = Collections.<IColumn>emptyList().iterator(); - - /* keys within a row */ - protected final AbstractBounds<RowPosition> range; - private ByteBuffer lastSeenKey; - private final ByteBuffer startKey; - private final ByteBuffer endKey; - - private Iterator<IColumn> currentIndexKeyData = null; - private final QueryPath path; - - private final IndexExpression expression; - private final ExtendedFilter filter; - protected final ColumnFamilyStore indexCfs; - private final ColumnFamilyStore baseCfs; - private final boolean rightRangeIsNotMinimum; - protected DecoratedKey curIndexKey; - - private final int rowsPerQuery; - private int columnsRead; - - - public MultiRowIndexSearcherIterator(IndexExpression expression, - ColumnFamilyStore baseCfs, - ColumnFamilyStore indexCfs, - ExtendedFilter filter, - AbstractBounds<RowPosition> range) - { - this.expression = expression; - this.baseCfs = baseCfs; - this.range = range; - this.filter = filter; - this.indexCfs = indexCfs; - - /* - * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of - * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small - * possible key having a given token. A fix would be to actually store the token along the key in the - * indexed row. - */ - startKey = range.left instanceof DecoratedKey ? ((DecoratedKey) range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; - endKey = range.right instanceof DecoratedKey ? ((DecoratedKey) range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; - - int meanColumns = Math.max(indexCfs.getMeanColumns(), 1); - - // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses - rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); - rightRangeIsNotMinimum = !range.right.isMinimum(baseCfs.partitioner); - path = new QueryPath(baseCfs.columnFamily); - - } - - /** - * This function should return indexCfs keys in order they would be scanned by searcher - * @return next key for scanning of null if endOfData - */ - protected abstract DecoratedKey nextIndexKey(); - - /** - * resets internal state preparing for next indexCfs row scan. - */ - protected void resetState() - { - curIndexKey = nextIndexKey(); - currentIndexKeyData = EMPTY_ITERATOR; - lastSeenKey = startKey; - columnsRead = Integer.MAX_VALUE; - } - - protected Row computeNext() - { - if (currentIndexKeyData == null) // this is first call. Initialize - resetState(); - - Row result = null; - while (result == null && curIndexKey != null) // curIndexKey would be null when endOfData is reached - { - if (!currentIndexKeyData.hasNext()) // we've finished scanning row page - { - if (columnsRead < rowsPerQuery) // previously we've read less then we queried. No more pages to read within this row - { - logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); - resetState(); - } - else - { - if (logger.isDebugEnabled()) - logger.debug(String.format("Scanning index %s starting with %s", - expressionString(expression), baseCfs.metadata.getKeyValidator().getString(startKey))); - - QueryFilter indexFilter = QueryFilter.getSliceFilter(curIndexKey, - new QueryPath(indexCfs.getColumnFamilyName()), - lastSeenKey, - endKey, - false, - rowsPerQuery); - - ColumnFamily indexRow = indexCfs.getColumnFamily(indexFilter); //get next row page - - if (indexRow != null) - { - Collection<IColumn> sortedColumns = indexRow.getSortedColumns(); - columnsRead = sortedColumns.size(); - currentIndexKeyData = sortedColumns.iterator(); - IColumn firstColumn = sortedColumns.iterator().next(); - - // Paging is racy, so it is possible the first column_name of a page is not the last seen one. - if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name())) - { - // skip the row we already saw w/ the last page of results - currentIndexKeyData.next(); - logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); - } - else if (range instanceof Range && currentIndexKeyData.hasNext() && firstColumn.name().equals(startKey)) - { - // skip key excluded by range - currentIndexKeyData.next(); - logger.debug("Skipping first key as range excludes it {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); - } - } - else // page is empty, nothing to scan within this row - { - columnsRead = 0; - currentIndexKeyData = EMPTY_ITERATOR; - } - } - } - - - while (result == null && currentIndexKeyData.hasNext()) // rolling through columns in page - { - IColumn column = currentIndexKeyData.next(); - lastSeenKey = column.name(); - - if (column.isMarkedForDelete()) - { - logger.debug("Skipping {}", column); - continue; - } - - DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey); - - if (rightRangeIsNotMinimum && range.right.compareTo(dk) < 0) // rightRangeIsNotMinimum is required to serve ring cycles - { - logger.debug("Reached end of assigned scan range"); - resetState(); - } - else if (range.contains(dk)) - { - logger.debug("Returning index hit for {}", dk); - ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter())); - - // While the column family we'll get in the end should contains the primary clause column_name, - // the initialFilter may not have found it and can thus be null - if (data == null) - data = ColumnFamily.create(baseCfs.metadata); - - result = new Row(dk, data); - } - else - { - logger.debug("Skipping entry {} outside of assigned scan range", dk.token); - } - } - } - - return result == null ? endOfData() : result; - } - - private String expressionString(IndexExpression expr) - { - return String.format("'%s.%s %s %s'", - baseCfs.columnFamily, - baseCfs.getComparator().getString(expr.column_name), - expr.op, - baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value)); - } - - public void close() throws IOException - {} -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e2706a7/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java index 0914c64..bd4be7e 100644 --- a/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java +++ b/src/java/org/apache/cassandra/db/index/keys/KeysSearcher.java @@ -17,18 +17,20 @@ */ package org.apache.cassandra.db.index.keys; +import java.io.IOException; import java.nio.ByteBuffer; import java.util.*; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.*; -import org.apache.cassandra.db.index.MultiRowIndexSearcherIterator; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.index.SecondaryIndexSearcher; import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.thrift.IndexExpression; import org.apache.cassandra.thrift.IndexOperator; +import org.apache.cassandra.utils.ByteBufferUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +66,15 @@ public class KeysSearcher extends SecondaryIndexSearcher return best; } + private String expressionString(IndexExpression expr) + { + return String.format("'%s.%s %s %s'", + baseCfs.columnFamily, + baseCfs.getComparator().getString(expr.column_name), + expr.op, + baseCfs.metadata.getColumn_metadata().get(expr.column_name).getValidator().getString(expr.value)); + } + public boolean isIndexing(List<IndexExpression> clause) { return highestSelectivityPredicate(clause) != null; @@ -84,32 +95,115 @@ public class KeysSearcher extends SecondaryIndexSearcher // TODO: allow merge join instead of just one index + loop final IndexExpression primary = highestSelectivityPredicate(filter.getClause()); final SecondaryIndex index = indexManager.getIndexForColumn(primary.column_name); - if (logger.isDebugEnabled()) logger.debug("Primary scan clause is " + baseCfs.getComparator().getString(primary.column_name)); - assert index != null; - return new KeysMultiRowIndexSearcherIterator(primary, filter, range, indexManager, index); - } + final DecoratedKey indexKey = indexManager.getIndexKeyFor(primary.column_name, primary.value); - public class KeysMultiRowIndexSearcherIterator extends MultiRowIndexSearcherIterator - { - final DecoratedKey indexKey; + /* + * XXX: If the range requested is a token range, we'll have to start at the beginning (and stop at the end) of + * the indexed row unfortunately (which will be inefficient), because we have not way to intuit the small + * possible key having a given token. A fix would be to actually store the token along the key in the + * indexed row. + */ + final ByteBuffer startKey = range.left instanceof DecoratedKey ? ((DecoratedKey)range.left).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; + final ByteBuffer endKey = range.right instanceof DecoratedKey ? ((DecoratedKey)range.right).key : ByteBufferUtil.EMPTY_BYTE_BUFFER; - public KeysMultiRowIndexSearcherIterator(IndexExpression expression, - ExtendedFilter filter, - AbstractBounds<RowPosition> range, - SecondaryIndexManager indexManager, - SecondaryIndex index) + return new ColumnFamilyStore.AbstractScanIterator() { - super(expression, baseCfs, index.getIndexCfs(), filter, range); - indexKey = indexManager.getIndexKeyFor(expression.column_name, expression.value); - } + private ByteBuffer lastSeenKey = startKey; + private Iterator<IColumn> indexColumns; + private final QueryPath path = new QueryPath(baseCfs.columnFamily); + private int columnsRead = Integer.MAX_VALUE; - @Override - protected final DecoratedKey nextIndexKey() - { - return curIndexKey == null ? indexKey : null; // keys index always scan single row in indexCfs - } + protected Row computeNext() + { + int meanColumns = Math.max(index.getIndexCfs().getMeanColumns(), 1); + // We shouldn't fetch only 1 row as this provides buggy paging in case the first row doesn't satisfy all clauses + int rowsPerQuery = Math.max(Math.min(filter.maxRows(), filter.maxColumns() / meanColumns), 2); + while (true) + { + if (indexColumns == null || !indexColumns.hasNext()) + { + if (columnsRead < rowsPerQuery) + { + logger.debug("Read only {} (< {}) last page through, must be done", columnsRead, rowsPerQuery); + return endOfData(); + } + + if (logger.isDebugEnabled()) + logger.debug(String.format("Scanning index %s starting with %s", + expressionString(primary), index.getBaseCfs().metadata.getKeyValidator().getString(startKey))); + + QueryFilter indexFilter = QueryFilter.getSliceFilter(indexKey, + new QueryPath(index.getIndexCfs().getColumnFamilyName()), + lastSeenKey, + endKey, + false, + rowsPerQuery); + ColumnFamily indexRow = index.getIndexCfs().getColumnFamily(indexFilter); + logger.debug("fetched {}", indexRow); + if (indexRow == null) + { + logger.debug("no data, all done"); + return endOfData(); + } + + Collection<IColumn> sortedColumns = indexRow.getSortedColumns(); + columnsRead = sortedColumns.size(); + indexColumns = sortedColumns.iterator(); + IColumn firstColumn = sortedColumns.iterator().next(); + + // Paging is racy, so it is possible the first column of a page is not the last seen one. + if (lastSeenKey != startKey && lastSeenKey.equals(firstColumn.name())) + { + // skip the row we already saw w/ the last page of results + indexColumns.next(); + columnsRead--; + logger.debug("Skipping {}", baseCfs.metadata.getKeyValidator().getString(firstColumn.name())); + } + else if (range instanceof Range && indexColumns.hasNext() && firstColumn.name().equals(startKey)) + { + // skip key excluded by range + indexColumns.next(); + columnsRead--; + logger.debug("Skipping first key as range excludes it"); + } + } + + while (indexColumns.hasNext()) + { + IColumn column = indexColumns.next(); + lastSeenKey = column.name(); + if (column.isMarkedForDelete()) + { + logger.debug("skipping {}", column.name()); + continue; + } + + DecoratedKey dk = baseCfs.partitioner.decorateKey(lastSeenKey); + if (!range.right.isMinimum(baseCfs.partitioner) && range.right.compareTo(dk) < 0) + { + logger.debug("Reached end of assigned scan range"); + return endOfData(); + } + if (!range.contains(dk)) + { + logger.debug("Skipping entry {} outside of assigned scan range", dk.token); + continue; + } + + logger.debug("Returning index hit for {}", dk); + ColumnFamily data = baseCfs.getColumnFamily(new QueryFilter(dk, path, filter.initialFilter())); + // While the column family we'll get in the end should contains the primary clause column, the initialFilter may not have found it and can thus be null + if (data == null) + data = ColumnFamily.create(baseCfs.metadata); + return new Row(dk, data); + } + } + } + + public void close() throws IOException {} + }; } }
