Author: jbellis
Date: Thu Mar 10 18:38:03 2011
New Revision: 1080310
URL: http://svn.apache.org/viewvc?rev=1080310&view=rev
Log:
include secondary index CFs when deciding which CFs to flush under memory
pressure
patch by jbellis; tested by Matt Conway for CASSANDRA-2295
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1080310&r1=1080309&r2=1080310&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Mar 10 18:38:03 2011
@@ -32,6 +32,7 @@ import javax.management.ObjectName;
import com.google.common.collect.Iterables;
import org.apache.commons.collections.IteratorUtils;
+import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -334,7 +335,6 @@ public class ColumnFamilyStore implement
{
public void run()
{
- logger.info("Creating index {}.{}", table,
indexedCfMetadata.cfName);
try
{
forceBlockingFlush();
@@ -348,7 +348,6 @@ public class ColumnFamilyStore implement
throw new AssertionError(e);
}
buildSecondaryIndexes(getSSTables(),
FBUtilities.singleton(info.name));
- logger.info("Index {} complete", indexedCfMetadata.cfName);
SystemTable.setIndexBuilt(table.name,
indexedCfMetadata.cfName);
}
};
@@ -357,7 +356,8 @@ public class ColumnFamilyStore implement
public void buildSecondaryIndexes(Collection<SSTableReader> sstables,
SortedSet<ByteBuffer> columns)
{
- logger.debug("Submitting index build to compactionmanager");
+ logger.info(String.format("Submitting index build of %s for data in
%s",
+ metadata.comparator.getString(columns),
StringUtils.join(sstables, ", ")));
Table.IndexBuilder builder = table.createIndexBuilder(this, columns,
new ReducingKeyIterator(sstables));
Future future = CompactionManager.instance.submitIndexBuild(this,
builder);
try
@@ -374,6 +374,7 @@ public class ColumnFamilyStore implement
{
throw new RuntimeException(e);
}
+ logger.info("Index build of " + metadata.comparator.getString(columns)
+ " complete");
}
// called when dropping or renaming a CF. Performs mbean housekeeping and
invalidates CFS to other operations.
@@ -684,26 +685,31 @@ public class ColumnFamilyStore implement
try
{
if (oldMemtable.isFrozen())
+ {
+ logger.debug("memtable is already frozen; another thread must
be flushing it");
return null;
+ }
boolean isDropped = isIndex()
? DatabaseDescriptor.getCFMetaData(table.name,
getParentColumnfamily()) == null
:
DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
if (isDropped)
- return null; // column family was dropped. no point in
flushing.
+ {
+ logger.debug("column family was dropped; no point in
flushing");
+ return null;
+ }
assert memtable == oldMemtable;
memtable.freeze();
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance.getContext() : null;
- logger.info("switching in a fresh Memtable for " + columnFamily +
" at " + ctx);
// submit the memtable for any indexed sub-cfses, and our own.
List<ColumnFamilyStore> icc = new
ArrayList<ColumnFamilyStore>(indexedColumns.size());
- icc.add(this);
- for (ColumnFamilyStore indexCfs : indexedColumns.values())
+ // don't assume that this.memtable is dirty; forceFlush can bring
us here during index build even if it is not
+ for (ColumnFamilyStore cfs :
Iterables.concat(Collections.singleton(this), indexedColumns.values()))
{
- if (!indexCfs.memtable.isClean())
- icc.add(indexCfs);
+ if (!cfs.memtable.isClean())
+ icc.add(cfs);
}
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
@@ -711,6 +717,10 @@ public class ColumnFamilyStore implement
submitFlush(cfs.memtable, latch);
cfs.memtable = new Memtable(cfs);
}
+ // we marked our memtable as frozen as part of the concurrency
control,
+ // so even if there was nothing to flush we need to switch it out
+ if (!icc.contains(this))
+ memtable = new Memtable(this);
// when all the memtables have been written, including for
indexes, mark the flush in the commitlog header.
// a second executor makes sure the onMemtableFlushes get called
in the right order,
@@ -754,8 +764,17 @@ public class ColumnFamilyStore implement
public Future<?> forceFlush()
{
- if (memtable.isClean())
+ // during index build, 2ary index memtables can be dirty even if
parent is not. if so,
+ // we want flushLargestMemtables to flush the 2ary index ones too.
+ boolean clean = true;
+ for (ColumnFamilyStore cfs :
Iterables.concat(Collections.singleton(this), getIndexColumnFamilyStores()))
+ clean &= cfs.memtable.isClean();
+
+ if (clean)
+ {
+ logger.debug("forceFlush requested but everything is clean");
return null;
+ }
return maybeSwitchMemtable(memtable, true);
}
@@ -1937,6 +1956,11 @@ public class ColumnFamilyStore implement
return indexedColumns.get(column);
}
+ public Collection<ColumnFamilyStore> getIndexColumnFamilyStores()
+ {
+ return indexedColumns.values();
+ }
+
public ColumnFamily newIndexedColumnFamily(ByteBuffer column)
{
return ColumnFamily.create(indexedColumns.get(column).metadata);
Modified:
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java?rev=1080310&r1=1080309&r2=1080310&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
(original)
+++
cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/service/StorageService.java
Thu Mar 10 18:38:03 2011
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
import com.google.common.base.Charsets;
import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import org.apache.cassandra.db.commitlog.CommitLog;
@@ -2195,14 +2196,28 @@ public class StorageService implements I
ColumnFamilyStore largestByThroughput = null;
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
{
- if (largestByOps == null || cfs.getMemtableColumnsCount() >
largestByOps.getMemtableColumnsCount())
+ long ops = 0;
+ long throughput = 0;
+ for (ColumnFamilyStore subordinate :
Iterables.concat(Collections.singleton(cfs), cfs.getIndexColumnFamilyStores()))
+ {
+ ops += subordinate.getMemtableColumnsCount();
+ throughput = subordinate.getMemtableThroughputInMB();
+ }
+
+ if (ops > 0 && (largestByOps == null || ops >
largestByOps.getMemtableColumnsCount()))
+ {
+ logger_.debug(ops + " total ops in " + cfs);
largestByOps = cfs;
- if (largestByThroughput == null || cfs.getMemtableThroughputInMB()
> largestByThroughput.getMemtableThroughputInMB())
+ }
+ if (throughput > 0 && (largestByThroughput == null || throughput >
largestByThroughput.getMemtableThroughputInMB()))
+ {
+ logger_.debug(throughput + " total throughput in " + cfs);
largestByThroughput = cfs;
+ }
}
if (largestByOps == null)
{
- logger_.error("Unable to reduce heap usage since there are no
column families defined");
+ logger_.info("Unable to reduce heap usage since there are no dirty
column families");
return;
}