Author: jbellis
Date: Wed Feb 17 23:22:41 2010
New Revision: 911224
URL: http://svn.apache.org/viewvc?rev=911224&view=rev
Log:
refactor to make memtablesPendingFlush a member variable instead of a static,
and Memtable to have a reference to CFS instead of table/cfname pair.
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911224&r1=911223&r2=911224&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Feb 17 23:22:41 2010
@@ -61,7 +61,6 @@
import org.apache.commons.collections.IteratorUtils;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
import com.google.common.collect.Iterators;
import com.google.common.base.Predicate;
@@ -84,7 +83,6 @@
* which is necessary for replay in case of a restart since CommitLog
assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
*/
- private static NonBlockingHashMap<String, Set<Memtable>>
memtablesPendingFlush = new NonBlockingHashMap<String, Set<Memtable>>();
private static ExecutorService flushSorter_
= new JMXEnabledThreadPoolExecutor(1,
Runtime.getRuntime().availableProcessors(),
@@ -103,6 +101,8 @@
private static final int KEY_RANGE_FILE_BUFFER_SIZE = 256 * 1024;
+ private Set<Memtable> memtablesPendingFlush = new
ConcurrentSkipListSet<Memtable>();
+
private final String table_;
public final String columnFamily_;
private final boolean isSuper_;
@@ -132,7 +132,7 @@
columnFamily_ = columnFamilyName;
isSuper_ = isSuper;
fileIndexGenerator_.set(indexValue);
- memtable_ = new Memtable(table_, columnFamily_);
+ memtable_ = new Memtable(this);
binaryMemtable_ = new AtomicReference<BinaryMemtable>(new
BinaryMemtable(this));
if (logger_.isDebugEnabled())
@@ -377,7 +377,7 @@
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance().getContext() : null;
logger_.info(columnFamily_ + " has reached its threshold;
switching in a fresh Memtable at " + ctx);
final Condition condition = submitFlush(oldMemtable);
- memtable_ = new Memtable(table_, columnFamily_);
+ memtable_ = new Memtable(this);
// a second executor that makes sure the onMemtableFlushes get
called in the right order,
// while keeping the wait-for-flush (future.get) out of anything
latency-sensitive.
return commitLogUpdater_.submit(new WrappedRunnable()
@@ -616,22 +616,6 @@
ssTables_.replace(sstables, replacements);
}
- public static List<Memtable> getUnflushedMemtables(String cfName)
- {
- return new
ArrayList<Memtable>(getMemtablesPendingFlushNotNull(cfName));
- }
-
- static Set<Memtable> getMemtablesPendingFlushNotNull(String
columnFamilyName)
- {
- Set<Memtable> memtables = memtablesPendingFlush.get(columnFamilyName);
- if (memtables == null)
- {
- memtablesPendingFlush.putIfAbsent(columnFamilyName, new
ConcurrentSkipListSet<Memtable>());
- memtables = memtablesPendingFlush.get(columnFamilyName); // might
not be the object we just put, if there was a race!
- }
- return memtables;
- }
-
/**
* submits flush sort on the flushSorter executor, which will in turn
submit to flushWriter when sorted.
* TODO because our executors use CallerRunsPolicy, when flushSorter fills
up, no writes will proceed
@@ -854,8 +838,7 @@
iterators.add(iter);
/* add the memtables being flushed */
- List<Memtable> memtables =
getUnflushedMemtables(filter.getColumnFamilyName());
- for (Memtable memtable:memtables)
+ for (Memtable memtable : getMemtablesPendingFlush())
{
iter = filter.getMemColumnIterator(memtable, getComparator());
returnCF.delete(iter.getColumnFamily());
@@ -930,7 +913,7 @@
// current memtable keys. have to go through the CFS api for locking.
iterators.add(Iterators.filter(memtableKeyIterator(startWith), p));
// historical memtables
- for (Memtable memtable :
ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
+ for (Memtable memtable : memtablesPendingFlush)
{
iterators.add(Iterators.filter(memtable.getKeyIterator(startWith),
p));
}
@@ -1179,4 +1162,10 @@
memtable_.clearUnsafe();
ssTables_.clearUnsafe();
}
+
+
+ public Set<Memtable> getMemtablesPendingFlush()
+ {
+ return memtablesPendingFlush;
+ }
}
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911224&r1=911223&r2=911224&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Wed Feb 17 23:22:41 2010
@@ -51,16 +51,15 @@
private final AtomicInteger currentThroughput = new AtomicInteger(0);
private final AtomicInteger currentOperations = new AtomicInteger(0);
- private final String table;
- private final String columnfamilyName;
private final long creationTime;
private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily>
columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
private final IPartitioner partitioner = StorageService.getPartitioner();
+ private final ColumnFamilyStore cfs;
- Memtable(String table, String cfName)
+ public Memtable(ColumnFamilyStore cfs)
{
- this.table = table;
- columnfamilyName = cfName;
+
+ this.cfs = cfs;
creationTime = System.currentTimeMillis();
}
@@ -147,8 +146,7 @@
private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
- ColumnFamilyStore cfStore =
Table.open(table).getColumnFamilyStore(columnfamilyName);
- SSTableWriter writer = new SSTableWriter(cfStore.getFlushPath(),
columnFamilies.size(), StorageService.getPartitioner());
+ SSTableWriter writer = new SSTableWriter(cfs.getFlushPath(),
columnFamilies.size(), StorageService.getPartitioner());
DataOutputBuffer buffer = new DataOutputBuffer();
for (Map.Entry<DecoratedKey, ColumnFamily> entry :
columnFamilies.entrySet())
@@ -160,21 +158,20 @@
writer.append(entry.getKey(), buffer);
}
- SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table,
columnfamilyName));
+ SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(getTableName(),
cfs.getColumnFamilyName()));
logger.info("Completed flushing " + ssTable.getFilename());
return ssTable;
}
public void flushAndSignal(final Condition condition, ExecutorService
sorter, final ExecutorService writer)
{
-
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this);
// it's ok for the MT to briefly be both active and pendingFlush
+ cfs.getMemtablesPendingFlush().add(this); // it's ok for the MT to
briefly be both active and pendingFlush
writer.submit(new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
- ColumnFamilyStore cfs =
Table.open(table).getColumnFamilyStore(columnfamilyName);
cfs.addSSTable(writeSortedContents());
-
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+ cfs.getMemtablesPendingFlush().remove(Memtable.this);
condition.signalAll();
}
});
@@ -182,7 +179,7 @@
public String toString()
{
- return "Memtable(" + columnfamilyName + ")@" + hashCode();
+ return "Memtable(" + cfs.getColumnFamilyName() + ")@" + hashCode();
}
public Iterator<DecoratedKey> getKeyIterator(DecoratedKey startWith)
@@ -195,19 +192,24 @@
return columnFamilies.isEmpty();
}
+ private String getTableName()
+ {
+ return cfs.getTable().name;
+ }
+
/**
* obtain an iterator of columns in this memtable in the specified order
starting from a given column.
*/
public ColumnIterator getSliceIterator(ColumnFamily cf, SliceQueryFilter
filter, AbstractType typeComparator)
{
- final ColumnFamily columnFamily = cf == null ?
ColumnFamily.create(table, filter.getColumnFamilyName()) : cf.cloneMeShallow();
+ final ColumnFamily columnFamily = cf == null ?
ColumnFamily.create(getTableName(), filter.getColumnFamilyName()) :
cf.cloneMeShallow();
final IColumn columns[] = (cf == null ? columnFamily :
cf).getSortedColumns().toArray(new
IColumn[columnFamily.getSortedColumns().size()]);
// TODO if we are dealing with supercolumns, we need to clone them
while we have the read lock since they can be modified later
if (filter.reversed)
ArrayUtils.reverse(columns);
IColumn startIColumn;
- final boolean isStandard =
DatabaseDescriptor.getColumnFamilyType(table,
filter.getColumnFamilyName()).equals("Standard");
+ final boolean isStandard =
DatabaseDescriptor.getColumnFamilyType(getTableName(),
filter.getColumnFamilyName()).equals("Standard");
if (isStandard)
startIColumn = new Column(filter.start);
else
@@ -252,8 +254,8 @@
public ColumnIterator getNamesIterator(final ColumnFamily cf, final
NamesQueryFilter filter)
{
- final ColumnFamily columnFamily = cf == null ?
ColumnFamily.create(table, filter.getColumnFamilyName()) : cf.cloneMeShallow();
- final boolean isStandard =
DatabaseDescriptor.getColumnFamilyType(table,
filter.getColumnFamilyName()).equals("Standard");
+ final ColumnFamily columnFamily = cf == null ?
ColumnFamily.create(getTableName(), filter.getColumnFamilyName()) :
cf.cloneMeShallow();
+ final boolean isStandard =
DatabaseDescriptor.getColumnFamilyType(getTableName(),
filter.getColumnFamilyName()).equals("Standard");
return new SimpleAbstractColumnIterator()
{