Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/Keyspace.java
src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/615612f6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/615612f6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/615612f6
Branch: refs/heads/cassandra-2.1
Commit: 615612f6156652f12d016318274e87e4aed05381
Parents: 17945ab 1f13efe
Author: Aleksey Yeschenko <[email protected]>
Authored: Tue Mar 18 15:25:08 2014 +0300
Committer: Aleksey Yeschenko <[email protected]>
Committed: Tue Mar 18 15:25:08 2014 +0300
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/Keyspace.java | 2 +-
.../db/index/SecondaryIndexManager.java | 85 ++++++++++++--------
3 files changed, 53 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/615612f6/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 384d995,862ea3e..f364efd
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -41,6 -15,8 +41,7 @@@ Merged from 2.0
* Read message id as string from earlier versions (CASSANDRA-6840)
* Properly use the Paxos consistency for (non-protocol) batch
(CASSANDRA-6837)
* Add paranoid disk failure option (CASSANDRA-6646)
+ * Improve PerRowSecondaryIndex performance (CASSANDRA-6876)
-Merged from 1.2:
* add extra SSL cipher suites (CASSANDRA-6613)
* fix nodetool getsstables for blob PK (CASSANDRA-6803)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/615612f6/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Keyspace.java
index fb0f273,f5369f9..436aca0
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@@ -391,11 -389,11 +391,11 @@@ public class Keyspac
if (logger.isDebugEnabled())
logger.debug("Indexing row {} ",
cfs.metadata.getKeyValidator().getString(key.key));
- Set<SecondaryIndex> indexes =
cfs.indexManager.getIndexesByNames(idxNames);
-
- switchLock.readLock().lock();
+ final OpOrder.Group opGroup = cfs.keyspace.writeOrder.start();
try
{
- Collection<SecondaryIndex> indexes =
cfs.indexManager.getIndexesByNames(idxNames);
++ Set<SecondaryIndex> indexes =
cfs.indexManager.getIndexesByNames(idxNames);
+
Iterator<ColumnFamily> pager = QueryPagers.pageRowLocally(cfs,
key.key, DEFAULT_PAGE_SIZE);
while (pager.hasNext())
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/615612f6/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
index 3fba451,5e49966..9d7629c
--- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
+++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java
@@@ -69,9 -66,13 +69,15 @@@ public class SecondaryIndexManage
*
* This allows updates to happen to an entire row at once
*/
- private final Map<Class<? extends SecondaryIndex>,SecondaryIndex>
rowLevelIndexMap;
+ private final ConcurrentMap<Class<? extends SecondaryIndex>,
SecondaryIndex> rowLevelIndexMap;
+
++
+ /**
+ * Keeps all secondary index instances, either per-column or per-row
+ */
+ private final Set<SecondaryIndex> allIndexes;
+
/**
* The underlying column family containing the source data for these
indexes
*/
@@@ -103,13 -105,11 +110,11 @@@
// TODO: allow all ColumnDefinition type
for (ColumnDefinition cdef : baseCfs.metadata.allColumns())
- if (cdef.getIndexType() != null &&
!indexedColumnNames.contains(cdef.name))
+ if (cdef.getIndexType() != null &&
!indexedColumnNames.contains(cdef.name.bytes))
addIndexedColumn(cdef);
- Set<SecondaryIndex> reloadedIndexes = Collections.newSetFromMap(new
IdentityHashMap<SecondaryIndex, Boolean>());
- for (SecondaryIndex index : indexesByColumn.values())
- if (reloadedIndexes.add(index))
- index.reload();
+ for (SecondaryIndex index : allIndexes)
+ index.reload();
}
public Set<String> allIndexesNames()
@@@ -143,17 -143,26 +148,26 @@@
flushIndexesBlocking();
- logger.info("Index build of " + idxNames + " complete");
+ logger.info("Index build of {} complete", idxNames);
}
- public boolean indexes(CellName name, Collection<SecondaryIndex> indexes)
- public boolean indexes(ByteBuffer name, Set<SecondaryIndex> indexes)
++ public boolean indexes(CellName name, Set<SecondaryIndex> indexes)
{
- return !indexFor(name, indexes).isEmpty();
+ boolean matching = false;
+ for (SecondaryIndex index : indexes)
+ {
+ if (index.indexes(name))
+ {
+ matching = true;
+ break;
+ }
+ }
+ return matching;
}
- public List<SecondaryIndex> indexFor(CellName name,
Collection<SecondaryIndex> indexes)
- public Set<SecondaryIndex> indexFor(ByteBuffer name, Set<SecondaryIndex>
indexes)
++ public Set<SecondaryIndex> indexFor(CellName name, Set<SecondaryIndex>
indexes)
{
- List<SecondaryIndex> matching = null;
+ Set<SecondaryIndex> matching = null;
for (SecondaryIndex index : indexes)
{
if (index.indexes(name))
@@@ -163,22 -172,22 +177,22 @@@
matching.add(index);
}
}
- return matching == null ? Collections.<SecondaryIndex>emptyList() :
matching;
+ return matching == null ? Collections.<SecondaryIndex>emptySet() :
matching;
}
- public boolean indexes(Column column)
+ public boolean indexes(Cell cell)
{
- return indexes(column.name());
+ return indexes(cell.name());
}
- public boolean indexes(ByteBuffer name)
+ public boolean indexes(CellName name)
{
- return indexes(name, indexesByColumn.values());
+ return indexes(name, allIndexes);
}
- public List<SecondaryIndex> indexFor(CellName name)
- public Set<SecondaryIndex> indexFor(ByteBuffer name)
++ public Set<SecondaryIndex> indexFor(CellName name)
{
- return indexFor(name, indexesByColumn.values());
+ return indexFor(name, allIndexes);
}
/**
@@@ -280,11 -293,14 +297,14 @@@
// so we don't have to lock everything while we do the build. it's up
to
// the operator to wait
// until the index is actually built before using in queries.
- indexesByColumn.put(cdef.name, index);
+ indexesByColumn.put(cdef.name.bytes, index);
+ // Add to all indexes set:
+ allIndexes.add(index);
+
// if we're just linking in the index to indexedColumns on an
// already-built index post-restart, we're done
- if (index.isIndexBuilt(cdef.name))
+ if (index.isIndexBuilt(cdef.name.bytes))
return null;
return index.buildIndexAsync();
@@@ -314,22 -330,8 +334,22 @@@
*/
public void flushIndexesBlocking()
{
+ // despatch flushes for all CFS backed indexes
+ List<Future<?>> wait = new ArrayList<>();
+ synchronized (baseCfs.getDataTracker())
+ {
- for (SecondaryIndex index : indexesByColumn.values())
++ for (SecondaryIndex index : allIndexes)
+ if (index.getIndexCfs() != null)
+ wait.add(index.getIndexCfs().forceFlush());
+ }
+
+ // blockingFlush any non-CFS-backed indexes
- for (SecondaryIndex index : indexesByColumn.values())
+ for (SecondaryIndex index : allIndexes)
- index.forceBlockingFlush();
+ if (index.getIndexCfs() == null)
+ index.forceBlockingFlush();
+
+ // wait for the CFS-backed index flushes to complete
+ FBUtilities.waitOnFutures(wait);
}
/**