Author: jbellis
Date: Wed Sep 22 03:42:09 2010
New Revision: 999744
URL: http://svn.apache.org/viewvc?rev=999744&view=rev
Log:
bug fixes:
- index writes that happen during index-build process
- fix circular Table.open during index creation by passing Table object to CFS
instead of String name
- avoid writing empty index sstables
patch by jbellis; reviewed by gdusbabek for CASSANDRA-1415
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Wed
Sep 22 03:42:09 2010
@@ -33,7 +33,6 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -101,7 +100,7 @@ public class ColumnFamilyStore implement
private Set<Memtable> memtablesPendingFlush = new
ConcurrentSkipListSet<Memtable>();
- public final String table;
+ public final Table table;
public final String columnFamily;
public final IPartitioner partitioner;
private final String mbeanName;
@@ -131,7 +130,7 @@ public class ColumnFamilyStore implement
private int minCompactionThreshold;
private int maxCompactionThreshold;
- private ColumnFamilyStore(String table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
+ private ColumnFamilyStore(Table table, String columnFamilyName,
IPartitioner partitioner, int generation, CFMetaData metadata)
{
assert metadata != null : "null metadata for " + table + ":" +
columnFamilyName;
this.table = table;
@@ -149,7 +148,7 @@ public class ColumnFamilyStore implement
// scan for sstables corresponding to this cf and load them
List<SSTableReader> sstables = new ArrayList<SSTableReader>();
- for (Map.Entry<Descriptor,Set<Component>> sstableFiles : files(table,
columnFamilyName, false).entrySet())
+ for (Map.Entry<Descriptor,Set<Component>> sstableFiles :
files(table.name, columnFamilyName, false).entrySet())
{
SSTableReader sstable;
try
@@ -163,11 +162,11 @@ public class ColumnFamilyStore implement
}
sstables.add(sstable);
}
- ssTables = new SSTableTracker(table, columnFamilyName);
+ ssTables = new SSTableTracker(table.name, columnFamilyName);
ssTables.add(sstables);
// create the private ColumnFamilyStores for the secondary column
indexes
- indexedColumns = new TreeMap<byte[],
ColumnFamilyStore>(getComparator());
+ indexedColumns = new ConcurrentSkipListMap<byte[],
ColumnFamilyStore>(getComparator());
for (ColumnDefinition info : metadata.column_metadata.values())
{
if (info.index_type != null)
@@ -196,32 +195,39 @@ public class ColumnFamilyStore implement
AbstractType columnComparator = (rowPartitioner instanceof
OrderPreservingPartitioner || rowPartitioner instanceof ByteOrderedPartitioner)
? BytesType.instance
: new
LocalByPartionerType(StorageService.getPartitioner());
- final CFMetaData indexedCfMetadata =
CFMetaData.newIndexMetadata(table, columnFamily, info, columnComparator);
+ final CFMetaData indexedCfMetadata =
CFMetaData.newIndexMetadata(table.name, columnFamily, info, columnComparator);
ColumnFamilyStore indexedCfs =
ColumnFamilyStore.createColumnFamilyStore(table,
indexedCfMetadata.cfName,
new LocalPartitioner(metadata.column_metadata.get(info.name).validator),
indexedCfMetadata);
- if (!SystemTable.isIndexBuilt(table, indexedCfMetadata.cfName))
+ // record that the column is supposed to be indexed, before we start
building it
+ // (so we don't omit indexing writes that happen during build process)
+ indexedColumns.put(info.name, indexedCfs);
+ if (!SystemTable.isIndexBuilt(table.name, indexedCfMetadata.cfName))
{
logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
- Runnable runnable = new WrappedRunnable()
+ try
{
- public void runMayThrow() throws IOException
- {
- buildSecondaryIndexes(getSSTables(),
FBUtilities.getSingleColumnSet(info.name));
- logger.info("Index {} complete", indexedCfMetadata.cfName);
- SystemTable.setIndexBuilt(table, indexedCfMetadata.cfName);
- }
- };
- forceFlush(runnable);
+ forceBlockingFlush();
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ buildSecondaryIndexes(getSSTables(),
FBUtilities.getSingleColumnSet(info.name));
+ logger.info("Index {} complete", indexedCfMetadata.cfName);
+ SystemTable.setIndexBuilt(table.name, indexedCfMetadata.cfName);
}
- indexedColumns.put(info.name, indexedCfs);
}
public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<byte[]> columns)
{
logger.debug("Submitting index build to compactionmanager");
- Table.IndexBuilder builder =
Table.open(table).createIndexBuilder(this, columns, new
ReducingKeyIterator(sstables));
+ Table.IndexBuilder builder = table.createIndexBuilder(this, columns,
new ReducingKeyIterator(sstables));
Future future = CompactionManager.instance.submitIndexBuild(this,
builder);
try
{
@@ -305,16 +311,16 @@ public class ColumnFamilyStore implement
return count > 0 ? (int) (sum / count) : 0;
}
- public static ColumnFamilyStore createColumnFamilyStore(String table,
String columnFamily)
+ public static ColumnFamilyStore createColumnFamilyStore(Table table,
String columnFamily)
{
- return createColumnFamilyStore(table, columnFamily,
StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table,
columnFamily));
+ return createColumnFamilyStore(table, columnFamily,
StorageService.getPartitioner(), DatabaseDescriptor.getCFMetaData(table.name,
columnFamily));
}
- public static synchronized ColumnFamilyStore
createColumnFamilyStore(String table, String columnFamily, IPartitioner
partitioner, CFMetaData metadata)
+ public static synchronized ColumnFamilyStore createColumnFamilyStore(Table
table, String columnFamily, IPartitioner partitioner, CFMetaData metadata)
{
// get the max generation number, to prevent generation conflicts
List<Integer> generations = new ArrayList<Integer>();
- for (Descriptor desc : files(table, columnFamily, true).keySet())
+ for (Descriptor desc : files(table.name, columnFamily, true).keySet())
generations.add(desc.generation);
Collections.sort(generations);
int value = (generations.size() > 0) ?
(generations.get(generations.size() - 1)) : 0;
@@ -411,7 +417,7 @@ public class ColumnFamilyStore implement
public String getFlushPath()
{
long guessedSize = 2 * DatabaseDescriptor.getMemtableThroughput() *
1024*1024; // 2* adds room for keys, column indexes
- String location =
DatabaseDescriptor.getDataFileLocationForTable(table, guessedSize);
+ String location =
DatabaseDescriptor.getDataFileLocationForTable(table.name, guessedSize);
if (location == null)
throw new RuntimeException("Insufficient disk space to flush");
return getTempSSTablePath(location);
@@ -420,7 +426,7 @@ public class ColumnFamilyStore implement
public String getTempSSTablePath(String directory)
{
Descriptor desc = new Descriptor(new File(directory),
- table,
+ table.name,
columnFamily,
fileIndexGenerator.incrementAndGet(),
true);
@@ -428,7 +434,7 @@ public class ColumnFamilyStore implement
}
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
- Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog, final Runnable afterFlush)
+ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
{
/**
* If we can get the writelock, that means no new updates can come in
and
@@ -446,9 +452,16 @@ public class ColumnFamilyStore implement
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance().getContext() : null;
logger.info("switching in a fresh Memtable for " + columnFamily +
" at " + ctx);
- // submit the memtable for any indexed sub-cfses, and our own
- final CountDownLatch latch = new CountDownLatch(1 +
indexedColumns.size());
- for (ColumnFamilyStore cfs :
Iterables.concat(indexedColumns.values(), Collections.singleton(this)))
+ // submit the memtable for any indexed sub-cfses, and our own.
+ List<ColumnFamilyStore> icc = new
ArrayList<ColumnFamilyStore>(indexedColumns.size());
+ icc.add(this);
+ for (ColumnFamilyStore indexCfs : indexedColumns.values())
+ {
+ if (!indexCfs.memtable.isClean())
+ icc.add(indexCfs);
+ }
+ final CountDownLatch latch = new CountDownLatch(icc.size());
+ for (ColumnFamilyStore cfs : icc)
{
submitFlush(cfs.memtable, latch);
cfs.memtable = new Memtable(cfs, cfs.partitioner);
@@ -467,8 +480,6 @@ public class ColumnFamilyStore implement
// if we're not writing to the commit log, we are
replaying the log, so marking
// the log header with "you can discard anything
written before the context" is not valid
CommitLog.instance().discardCompletedSegments(metadata.cfId, ctx);
- if (afterFlush != null)
- afterFlush.run();
}
}
});
@@ -498,15 +509,10 @@ public class ColumnFamilyStore implement
public Future<?> forceFlush()
{
- return forceFlush(null);
- }
-
- public Future<?> forceFlush(Runnable afterFlush)
- {
if (memtable.isClean())
return null;
- return maybeSwitchMemtable(memtable, true, afterFlush);
+ return maybeSwitchMemtable(memtable, true);
}
public void forceBlockingFlush() throws ExecutionException,
InterruptedException
@@ -721,11 +727,6 @@ public class ColumnFamilyStore implement
CompactionManager.instance.submitCleanup(ColumnFamilyStore.this);
}
- public Table getTable()
- {
- return Table.open(table);
- }
-
void markCompacted(Collection<SSTableReader> sstables)
{
ssTables.markCompacted(sstables);
@@ -1342,7 +1343,7 @@ public class ColumnFamilyStore implement
{
// mkdir
File dataDirectory =
ssTable.getDescriptor().directory.getParentFile();
- String snapshotDirectoryPath =
Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table, snapshotName);
+ String snapshotDirectoryPath =
Table.getSnapshotPath(dataDirectory.getAbsolutePath(), table.name,
snapshotName);
FileUtils.createDirectory(snapshotDirectoryPath);
// hard links
@@ -1531,7 +1532,7 @@ public class ColumnFamilyStore implement
// complete as much of the job as possible. Don't let errors long the
way prevent as much renaming as possible
// from happening.
IOException mostRecentProblem = null;
- for (File existing : DefsTable.getFiles(table, columnFamily))
+ for (File existing : DefsTable.getFiles(table.name, columnFamily))
{
try
{
@@ -1552,11 +1553,6 @@ public class ColumnFamilyStore implement
}
}
- public static Future<?> submitPostFlush(Runnable runnable)
- {
- return postFlushExecutor.submit(runnable);
- }
-
public long getBloomFilterFalsePositives()
{
long count = 0L;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/CompactionManager.java Wed
Sep 22 03:42:09 2010
@@ -220,7 +220,7 @@ public class CompactionManager implement
{
// The collection of sstables passed may be empty (but not null); even
if
// it is not empty, it may compact down to nothing if all rows are
deleted.
- Table table = cfs.getTable();
+ Table table = cfs.table;
if (DatabaseDescriptor.isSnapshotBeforeCompaction())
table.snapshot("compact-" + cfs.columnFamily);
logger.info("Compacting [" + StringUtils.join(sstables, ",") + "]");
@@ -324,7 +324,7 @@ public class CompactionManager implement
private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
throws IOException
{
- Table table = cfs.getTable();
+ Table table = cfs.table;
logger.info("AntiCompacting [" + StringUtils.join(sstables, ",") +
"]");
// Calculate the expected compacted filesize
long expectedRangeFileSize =
cfs.getExpectedCompactedFileSize(sstables) / 2;
@@ -396,7 +396,7 @@ public class CompactionManager implement
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
+ List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.table.name), null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Wed Sep 22
03:42:09 2010
@@ -194,7 +194,7 @@ public class Memtable implements Compara
public String getTableName()
{
- return cfs.getTable().name;
+ return cfs.table.name;
}
/**
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/SystemTable.java Wed Sep
22 03:42:09 2010
@@ -347,13 +347,20 @@ public class SystemTable
return cfs.getColumnFamily(filter) != null;
}
- public static void setIndexBuilt(String table, String indexName) throws
IOException
+ public static void setIndexBuilt(String table, String indexName)
{
ColumnFamily cf = ColumnFamily.create(Table.SYSTEM_TABLE, INDEX_CF);
cf.addColumn(new Column(indexName.getBytes(UTF_8),
ArrayUtils.EMPTY_BYTE_ARRAY, new TimestampClock(System.currentTimeMillis())));
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE,
table.getBytes(UTF_8));
rm.add(cf);
- rm.apply();
+ try
+ {
+ rm.apply();
+ }
+ catch (IOException e)
+ {
+ throw new IOError(e);
+ }
}
public static class StorageMetadata
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=999744&r1=999743&r2=999744&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Wed Sep 22
03:42:09 2010
@@ -241,7 +241,10 @@ public class Table
}
for (CFMetaData cfm : new
ArrayList<CFMetaData>(DatabaseDescriptor.getTableDefinition(table).cfMetaData().values()))
+ {
+ logger.debug("Initializing {}.{}", name, cfm.cfName);
initCf(cfm.cfId, cfm.cfName);
+ }
// check 10x as often as the lifetime, so we can exceed lifetime by
10% at most
int checkMs = DatabaseDescriptor.getMemtableLifetimeMS() / 10;
@@ -292,7 +295,7 @@ public class Table
{
assert !columnFamilyStores.containsKey(cfId) : String.format("tried to
init %s as %s, but already used by %s",
cfName,
cfId, columnFamilyStores.get(cfId));
- columnFamilyStores.put(cfId,
ColumnFamilyStore.createColumnFamilyStore(name, cfName));
+ columnFamilyStores.put(cfId,
ColumnFamilyStore.createColumnFamilyStore(this, cfName));
}
public void reloadCf(Integer cfId) throws IOException
@@ -386,7 +389,7 @@ public class Table
// flush memtables that got filled up. usually mTF will be empty and
this will be a no-op
for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
- entry.getKey().maybeSwitchMemtable(entry.getValue(),
writeCommitLog, null);
+ entry.getKey().maybeSwitchMemtable(entry.getValue(),
writeCommitLog);
}
private static void ignoreObsoleteMutations(ColumnFamily cf,
AbstractReconciler reconciler, SortedSet<byte[]> mutatedIndexedColumns,
ColumnFamily oldIndexedColumns)
@@ -485,7 +488,7 @@ public class Table
}
for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
- entry.getKey().maybeSwitchMemtable(entry.getValue(),
false, null);
+ entry.getKey().maybeSwitchMemtable(entry.getValue(),
false);
}
try