Author: gdusbabek
Date: Fri Nov 19 15:28:51 2010
New Revision: 1036898
URL: http://svn.apache.org/viewvc?rev=1036898&view=rev
Log:
make addIndex asynchronous and race proof. patch by jbellis, reviewed by
gdusbabek. CASSANDRA-1715
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1036898&r1=1036897&r2=1036898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Nov 19 15:28:51 2010
@@ -113,7 +113,7 @@ public class ColumnFamilyStore implement
/* active memtable associated with this ColumnFamilyStore. */
private Memtable memtable;
- private final SortedMap<ByteBuffer, ColumnFamilyStore> indexedColumns;
+ private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore>
indexedColumns;
// TODO binarymemtable ops are not threadsafe (do they need to be?)
private AtomicReference<BinaryMemtable> binaryMemtable;
@@ -174,37 +174,25 @@ public class ColumnFamilyStore implement
ssTables.updateCacheSizes();
// figure out what needs to be added and dropped.
- final Set<ByteBuffer> indexesToDrop = new HashSet<ByteBuffer>();
- final Set<ColumnDefinition> indexesToAdd = new
HashSet<ColumnDefinition>();
-
- for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
- if (!indexedColumns.containsKey(cdef.name))
- indexesToAdd.add(cdef);
- for (ByteBuffer indexName : indexedColumns.keySet())
- if (!metadata.getColumn_metadata().containsKey(indexName))
- indexesToDrop.add(indexName);
// future: if/when we have modifiable settings for secondary indexes,
they'll need to be handled here.
-
- final Runnable indexMaintenance = new Runnable()
+ for (ByteBuffer indexName : indexedColumns.keySet())
{
- public void run()
+ if (!metadata.getColumn_metadata().containsKey(indexName))
{
- // drop indexes no longer needed.
- for (ByteBuffer indexName : indexesToDrop)
+ ColumnFamilyStore indexCfs = indexedColumns.remove(indexName);
+ if (indexCfs == null)
{
- ColumnFamilyStore indexCfs =
indexedColumns.remove(indexName);
- assert indexCfs != null;
- SystemTable.setIndexRemoved(metadata.tableName,
metadata.cfName);
- indexCfs.removeAllSSTables();
- }
- // add new indexes.
- for (ColumnDefinition info : indexesToAdd)
- if (info.getIndexType() != null)
- addIndex(info);
+ logger.debug("index {} already removed; ignoring",
FBUtilities.bytesToHex(indexName));
+ continue;
+ }
+ SystemTable.setIndexRemoved(metadata.tableName,
metadata.cfName);
+ indexCfs.removeAllSSTables();
}
- };
- // reset the memtable with new settings.
- maybeSwitchMemtable(memtable, true, indexMaintenance);
+ }
+
+ for (ColumnDefinition cdef : metadata.getColumn_metadata().values())
+ if (!indexedColumns.containsKey(cdef.name) && cdef.getIndexType()
!= null)
+ addIndex(cdef);
}
private ColumnFamilyStore(Table table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
@@ -310,6 +298,8 @@ public class ColumnFamilyStore implement
public void addIndex(final ColumnDefinition info)
{
assert info.getIndexType() != null;
+
+ // create the index CFS
IPartitioner rowPartitioner = StorageService.getPartitioner();
AbstractType columnComparator = (rowPartitioner instanceof
OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
? BytesType.instance
@@ -319,28 +309,42 @@ public class ColumnFamilyStore implement
indexedCfMetadata.cfName,
new LocalPartitioner(metadata.getColumn_metadata().get(info.name).validator),
indexedCfMetadata);
- // record that the column is supposed to be indexed, before we start
building it
- // (so we don't omit indexing writes that happen during build process)
- indexedColumns.put(info.name, indexedCfs);
- if (!SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+
+ // link in indexedColumns. this means that writes will add new data
to the index immediately,
+ // so we don't have to lock everything while we do the build. it's up
to the operator to wait
+ // until the index is actually built before using in queries.
+ if (indexedColumns.putIfAbsent(info.name, indexedCfs) != null)
+ return;
+
+ // if we're just linking in the index to indexedColumns on an
already-built index post-restart, we're done
+ if (SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
+ return;
+
+ // build it asynchronously; addIndex gets called by CFS open and
schema update, neither of which
+ // we want to block for a long period. (actual build is serialized on
CompactionManager.)
+ Runnable runnable = new Runnable()
{
- logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
- try
- {
- forceBlockingFlush();
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
- catch (InterruptedException e)
+ public void run()
{
- throw new AssertionError(e);
+ logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
+ try
+ {
+ forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ buildSecondaryIndexes(getSSTables(),
FBUtilities.singleton(info.name));
+ logger.info("Index {} complete", indexedCfMetadata.cfName);
+ SystemTable.setIndexBuilt(table.name,
indexedCfMetadata.cfName);
}
- buildSecondaryIndexes(getSSTables(),
FBUtilities.singleton(info.name));
- logger.info("Index {} complete", indexedCfMetadata.cfName);
- SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
- }
+ };
+ new Thread(runnable, "Create index " +
indexedCfMetadata.cfName).start();
}
public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<ByteBuffer> columns)
@@ -608,7 +612,7 @@ public class ColumnFamilyStore implement
}
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
- Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog, final Runnable postFlush)
+ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
{
/*
* If we can get the writelock, that means no new updates can come in
and
@@ -663,8 +667,6 @@ public class ColumnFamilyStore implement
// the log header with "you can discard anything
written before the context" is not valid
CommitLog.instance.discardCompletedSegments(metadata.cfId, ctx);
}
- if (postFlush != null)
- postFlush.run();
}
});
}
@@ -696,7 +698,7 @@ public class ColumnFamilyStore implement
if (memtable.isClean())
return null;
- return maybeSwitchMemtable(memtable, true, null);
+ return maybeSwitchMemtable(memtable, true);
}
public void forceBlockingFlush() throws ExecutionException,
InterruptedException
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1036898&r1=1036897&r2=1036898&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Nov 19
15:28:51 2010
@@ -448,7 +448,7 @@ public class Table
// flush memtables that got filled up outside the readlock
(maybeSwitchMemtable acquires writeLock).
// usually mTF will be empty and this will be a no-op.
for (Memtable memtable : memtablesToFlush)
- memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog, null);
+ memtable.cfs.maybeSwitchMemtable(memtable, writeCommitLog);
}
private static List<Memtable> addFullMemtable(List<Memtable>
memtablesToFlush, Memtable fullMemtable)
@@ -594,7 +594,7 @@ public class Table
// during index build, we do flush index memtables separately
from master; otherwise we could OOM
for (Memtable memtable : memtablesToFlush)
- memtable.cfs.maybeSwitchMemtable(memtable, false, null);
+ memtable.cfs.maybeSwitchMemtable(memtable, false);
}
try