Author: jbellis
Date: Wed Feb 17 23:22:23 2010
New Revision: 911223
URL: http://svn.apache.org/viewvc?rev=911223&view=rev
Log:
use a sorted map for memtable contents to make range queries not have to sort
every time
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799
Modified:
incubator/cassandra/trunk/CHANGES.txt
incubator/cassandra/trunk/conf/storage-conf.xml
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Modified: incubator/cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Feb 17 23:22:23 2010
@@ -25,6 +25,10 @@
get_string_list_property
* jmx interface for tracking operation mode and streams in general.
(CASSANDRA-709)
+ * keep memtables in sorted order to improve range query performance
+ (CASSANDRA-799)
+ * use while loop instead of recursion when trimming sstables compaction list
+ to avoid blowing stack in pathological cases (CASSANDRA-804)
0.5.1
Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Feb 17 23:22:23 2010
@@ -308,7 +308,7 @@
~ ColumnFamily before flushing to disk. This is also a per-memtable
~ setting. Use with MemtableThroughputInMB to tune memory usage.
-->
- <MemtableOperationsInMillions>0.1</MemtableOperationsInMillions>
+ <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
<!--
~ The maximum time to leave a dirty memtable unflushed.
~ (While any affected columnfamilies have unflushed data from a
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
Wed Feb 17 23:22:23 2010
@@ -19,26 +19,22 @@
package org.apache.cassandra.db;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
import org.apache.log4j.Logger;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.WrappedRunnable;
import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.apache.cassandra.dht.IPartitioner;
public class BinaryMemtable implements IFlushable
{
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Feb 17 23:22:23 2010
@@ -90,14 +90,14 @@
Runtime.getRuntime().availableProcessors(),
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(1 + 2 *
Runtime.getRuntime().availableProcessors()),
+ new
LinkedBlockingQueue<Runnable>(1 + Runtime.getRuntime().availableProcessors()),
new
NamedThreadFactory("FLUSH-SORTER-POOL"));
private static ExecutorService flushWriter_
- = new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
+ = new JMXEnabledThreadPoolExecutor(1,
DatabaseDescriptor.getAllDataFileLocations().length,
Integer.MAX_VALUE,
TimeUnit.SECONDS,
- new
LinkedBlockingQueue<Runnable>(),
+ new
LinkedBlockingQueue<Runnable>(1 + 2 *
DatabaseDescriptor.getAllDataFileLocations().length),
new
NamedThreadFactory("FLUSH-WRITER-POOL"));
private static ExecutorService commitLogUpdater_ = new
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
@@ -641,7 +641,7 @@
* flushing thread finishes sorting, which will almost always be longer
than any of the flushSorter threads proper
* (since, by definition, it started last).
*/
- Condition submitFlush(final IFlushable flushable)
+ Condition submitFlush(IFlushable flushable)
{
logger_.info("Enqueuing flush of " + flushable);
final Condition condition = new SimpleCondition();
@@ -692,12 +692,12 @@
}
}
- public Iterator<DecoratedKey> memtableKeyIterator() throws
ExecutionException, InterruptedException
+ public Iterator<DecoratedKey> memtableKeyIterator(DecoratedKey startWith)
throws ExecutionException, InterruptedException
{
Table.flusherLock.readLock().lock();
try
{
- return memtable_.getKeyIterator();
+ return memtable_.getKeyIterator(startWith);
}
finally
{
@@ -928,11 +928,11 @@
};
// current memtable keys. have to go through the CFS api for locking.
- iterators.add(Iterators.filter(memtableKeyIterator(), p));
+ iterators.add(Iterators.filter(memtableKeyIterator(startWith), p));
// historical memtables
for (Memtable memtable :
ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
{
- iterators.add(Iterators.filter(memtable.getKeyIterator(), p));
+ iterators.add(Iterators.filter(memtable.getKeyIterator(startWith),
p));
}
// sstables
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
Wed Feb 17 23:22:23 2010
@@ -21,14 +21,9 @@
*/
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.Condition;
-import org.apache.cassandra.io.SSTableReader;
-
public interface IFlushable
{
public void flushAndSignal(Condition condition, ExecutorService sorter,
ExecutorService writer);
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Wed Feb 17 23:22:23 2010
@@ -20,25 +20,24 @@
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
+import org.apache.log4j.Logger;
import org.apache.commons.lang.ArrayUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.io.SSTableReader;
import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.DestructivePQIterator;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-import org.apache.log4j.Logger;
public class Memtable implements Comparable<Memtable>, IFlushable
{
@@ -55,7 +54,7 @@
private final String table;
private final String columnfamilyName;
private final long creationTime;
- private final NonBlockingHashMap<DecoratedKey, ColumnFamily>
columnFamilies = new NonBlockingHashMap<DecoratedKey, ColumnFamily>();
+ private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily>
columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
private final IPartitioner partitioner = StorageService.getPartitioner();
Memtable(String table, String cfName)
@@ -144,30 +143,21 @@
return builder.toString();
}
- private List<DecoratedKey> getSortedKeys()
- {
- logger.info("Sorting " + this);
- // sort keys in the order they would be in when decorated
- ArrayList<DecoratedKey> orderedKeys = new
ArrayList<DecoratedKey>(columnFamilies.keySet());
- Collections.sort(orderedKeys);
- return orderedKeys;
- }
- private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys)
throws IOException
+ private SSTableReader writeSortedContents() throws IOException
{
logger.info("Writing " + this);
ColumnFamilyStore cfStore =
Table.open(table).getColumnFamilyStore(columnfamilyName);
SSTableWriter writer = new SSTableWriter(cfStore.getFlushPath(),
columnFamilies.size(), StorageService.getPartitioner());
DataOutputBuffer buffer = new DataOutputBuffer();
- for (DecoratedKey key : sortedKeys)
+ for (Map.Entry<DecoratedKey, ColumnFamily> entry :
columnFamilies.entrySet())
{
buffer.reset();
- ColumnFamily columnFamily = columnFamilies.get(key);
/* serialize the cf with column indexes */
- ColumnFamily.serializer().serializeWithIndexes(columnFamily,
buffer);
+ ColumnFamily.serializer().serializeWithIndexes(entry.getValue(),
buffer);
/* Now write the key and value to disk */
- writer.append(key, buffer);
+ writer.append(entry.getKey(), buffer);
}
SSTableReader ssTable =
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table,
columnfamilyName));
@@ -178,21 +168,14 @@
public void flushAndSignal(final Condition condition, ExecutorService
sorter, final ExecutorService writer)
{
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this);
// it's ok for the MT to briefly be both active and pendingFlush
- sorter.submit(new Runnable()
+ writer.submit(new WrappedRunnable()
{
- public void run()
+ public void runMayThrow() throws IOException
{
- final List<DecoratedKey> sortedKeys = getSortedKeys();
- writer.submit(new WrappedRunnable()
- {
- public void runMayThrow() throws IOException
- {
- ColumnFamilyStore cfs =
Table.open(table).getColumnFamilyStore(columnfamilyName);
- cfs.addSSTable(writeSortedContents(sortedKeys));
-
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
- condition.signalAll();
- }
- });
+ ColumnFamilyStore cfs =
Table.open(table).getColumnFamilyStore(columnfamilyName);
+ cfs.addSSTable(writeSortedContents());
+
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+ condition.signalAll();
}
});
}
@@ -202,18 +185,9 @@
return "Memtable(" + columnfamilyName + ")@" + hashCode();
}
- public Iterator<DecoratedKey> getKeyIterator()
+ public Iterator<DecoratedKey> getKeyIterator(DecoratedKey startWith)
{
- // even though we are using NBHM, it is okay to use size() twice here,
since size() will never decrease
- // w/in a single memtable's lifetime
- if (columnFamilies.size() == 0)
- {
- // cannot create a PQ of size zero (wtf?)
- return Arrays.asList(new DecoratedKey[0]).iterator();
- }
- PriorityQueue<DecoratedKey> pq = new
PriorityQueue<DecoratedKey>(columnFamilies.size());
- pq.addAll(columnFamilies.keySet());
- return new DestructivePQIterator<DecoratedKey>(pq);
+ return columnFamilies.navigableKeySet().tailSet(startWith).iterator();
}
public boolean isClean()