Author: jbellis
Date: Wed Feb 17 23:22:23 2010
New Revision: 911223

URL: http://svn.apache.org/viewvc?rev=911223&view=rev
Log:
use a sorted map for memtable contents to make range queries not have to sort 
every time
patch by jbellis; reviewed by Stu Hood for CASSANDRA-799

Modified:
    incubator/cassandra/trunk/CHANGES.txt
    incubator/cassandra/trunk/conf/storage-conf.xml
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
    
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
    incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java

Modified: incubator/cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/CHANGES.txt?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/CHANGES.txt (original)
+++ incubator/cassandra/trunk/CHANGES.txt Wed Feb 17 23:22:23 2010
@@ -25,6 +25,10 @@
    get_string_list_property
  * jmx interface for tracking operation mode and streams in general.
    (CASSANDRA-709)
+ * keep memtables in sorted order to improve range query performance
+   (CASSANDRA-799)
+ * use while loop instead of recursion when trimming sstables compaction list 
+   to avoid blowing stack in pathological cases (CASSANDRA-804)
 
 
 0.5.1

Modified: incubator/cassandra/trunk/conf/storage-conf.xml
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/conf/storage-conf.xml?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/conf/storage-conf.xml (original)
+++ incubator/cassandra/trunk/conf/storage-conf.xml Wed Feb 17 23:22:23 2010
@@ -308,7 +308,7 @@
    ~ ColumnFamily before flushing to disk.  This is also a per-memtable
    ~ setting.  Use with MemtableThroughputInMB to tune memory usage.
   -->
-  <MemtableOperationsInMillions>0.1</MemtableOperationsInMillions>
+  <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
   <!--
    ~ The maximum time to leave a dirty memtable unflushed.
    ~ (While any affected columnfamilies have unflushed data from a

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java 
(original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java 
Wed Feb 17 23:22:23 2010
@@ -19,26 +19,22 @@
 package org.apache.cassandra.db;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
-import org.apache.cassandra.io.SSTableWriter;
-import org.apache.cassandra.io.SSTableReader;
-import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.config.DatabaseDescriptor;
-
 import org.apache.log4j.Logger;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.IPartitioner;
+import org.apache.cassandra.io.SSTableReader;
+import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.utils.WrappedRunnable;
 import org.cliffc.high_scale_lib.NonBlockingHashMap;
-import org.apache.cassandra.dht.IPartitioner;
 
 public class BinaryMemtable implements IFlushable
 {

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 (original)
+++ 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
 Wed Feb 17 23:22:23 2010
@@ -90,14 +90,14 @@
                                                
Runtime.getRuntime().availableProcessors(),
                                                Integer.MAX_VALUE,
                                                TimeUnit.SECONDS,
-                                               new 
LinkedBlockingQueue<Runnable>(1 + 2 * 
Runtime.getRuntime().availableProcessors()),
+                                               new 
LinkedBlockingQueue<Runnable>(1 + Runtime.getRuntime().availableProcessors()),
                                                new 
NamedThreadFactory("FLUSH-SORTER-POOL"));
     private static ExecutorService flushWriter_
-            = new 
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getAllDataFileLocations().length,
+            = new JMXEnabledThreadPoolExecutor(1,
                                                
DatabaseDescriptor.getAllDataFileLocations().length,
                                                Integer.MAX_VALUE,
                                                TimeUnit.SECONDS,
-                                               new 
LinkedBlockingQueue<Runnable>(),
+                                               new 
LinkedBlockingQueue<Runnable>(1 + 2 * 
DatabaseDescriptor.getAllDataFileLocations().length),
                                                new 
NamedThreadFactory("FLUSH-WRITER-POOL"));
     private static ExecutorService commitLogUpdater_ = new 
JMXEnabledThreadPoolExecutor("MEMTABLE-POST-FLUSHER");
 
@@ -641,7 +641,7 @@
      * flushing thread finishes sorting, which will almost always be longer 
than any of the flushSorter threads proper
      * (since, by definition, it started last).
      */
-    Condition submitFlush(final IFlushable flushable)
+    Condition submitFlush(IFlushable flushable)
     {
         logger_.info("Enqueuing flush of " + flushable);
         final Condition condition = new SimpleCondition();
@@ -692,12 +692,12 @@
         }
     }
 
-    public Iterator<DecoratedKey> memtableKeyIterator() throws 
ExecutionException, InterruptedException
+    public Iterator<DecoratedKey> memtableKeyIterator(DecoratedKey startWith) 
throws ExecutionException, InterruptedException
     {
         Table.flusherLock.readLock().lock();
         try
         {
-             return memtable_.getKeyIterator();
+             return memtable_.getKeyIterator(startWith);
         }
         finally
         {
@@ -928,11 +928,11 @@
         };
 
         // current memtable keys.  have to go through the CFS api for locking.
-        iterators.add(Iterators.filter(memtableKeyIterator(), p));
+        iterators.add(Iterators.filter(memtableKeyIterator(startWith), p));
         // historical memtables
         for (Memtable memtable : 
ColumnFamilyStore.getUnflushedMemtables(columnFamily_))
         {
-            iterators.add(Iterators.filter(memtable.getKeyIterator(), p));
+            iterators.add(Iterators.filter(memtable.getKeyIterator(startWith), 
p));
         }
 
         // sstables

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java 
Wed Feb 17 23:22:23 2010
@@ -21,14 +21,9 @@
  */
 
 
-import java.io.IOException;
-import java.util.List;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.locks.Condition;
 
-import org.apache.cassandra.io.SSTableReader;
-
 public interface IFlushable
 {
     public void flushAndSignal(Condition condition, ExecutorService sorter, 
ExecutorService writer);

Modified: 
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=911223&r1=911222&r2=911223&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java 
Wed Feb 17 23:22:23 2010
@@ -20,25 +20,24 @@
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 
+import org.apache.log4j.Logger;
 import org.apache.commons.lang.ArrayUtils;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.filter.*;
+import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.IPartitioner;
-import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.io.SSTableReader;
 import org.apache.cassandra.io.SSTableWriter;
+import org.apache.cassandra.io.util.DataOutputBuffer;
 import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.DestructivePQIterator;
-import org.apache.cassandra.db.filter.*;
-import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.utils.WrappedRunnable;
-import org.cliffc.high_scale_lib.NonBlockingHashMap;
-
-import org.apache.log4j.Logger;
 
 public class Memtable implements Comparable<Memtable>, IFlushable
 {
@@ -55,7 +54,7 @@
     private final String table;
     private final String columnfamilyName;
     private final long creationTime;
-    private final NonBlockingHashMap<DecoratedKey, ColumnFamily> 
columnFamilies = new NonBlockingHashMap<DecoratedKey, ColumnFamily>();
+    private final ConcurrentNavigableMap<DecoratedKey, ColumnFamily> 
columnFamilies = new ConcurrentSkipListMap<DecoratedKey, ColumnFamily>();
     private final IPartitioner partitioner = StorageService.getPartitioner();
 
     Memtable(String table, String cfName)
@@ -144,30 +143,21 @@
         return builder.toString();
     }
 
-    private List<DecoratedKey> getSortedKeys()
-    {
-        logger.info("Sorting " + this);
-        // sort keys in the order they would be in when decorated
-        ArrayList<DecoratedKey> orderedKeys = new 
ArrayList<DecoratedKey>(columnFamilies.keySet());
-        Collections.sort(orderedKeys);
-        return orderedKeys;
-    }
 
-    private SSTableReader writeSortedContents(List<DecoratedKey> sortedKeys) 
throws IOException
+    private SSTableReader writeSortedContents() throws IOException
     {
         logger.info("Writing " + this);
         ColumnFamilyStore cfStore = 
Table.open(table).getColumnFamilyStore(columnfamilyName);
         SSTableWriter writer = new SSTableWriter(cfStore.getFlushPath(), 
columnFamilies.size(), StorageService.getPartitioner());
 
         DataOutputBuffer buffer = new DataOutputBuffer();
-        for (DecoratedKey key : sortedKeys)
+        for (Map.Entry<DecoratedKey, ColumnFamily> entry : 
columnFamilies.entrySet())
         {
             buffer.reset();
-            ColumnFamily columnFamily = columnFamilies.get(key);
             /* serialize the cf with column indexes */
-            ColumnFamily.serializer().serializeWithIndexes(columnFamily, 
buffer);
+            ColumnFamily.serializer().serializeWithIndexes(entry.getValue(), 
buffer);
             /* Now write the key and value to disk */
-            writer.append(key, buffer);
+            writer.append(entry.getKey(), buffer);
         }
 
         SSTableReader ssTable = 
writer.closeAndOpenReader(DatabaseDescriptor.getKeysCachedFraction(table, 
columnfamilyName));
@@ -178,21 +168,14 @@
     public void flushAndSignal(final Condition condition, ExecutorService 
sorter, final ExecutorService writer)
     {
         
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).add(this); 
// it's ok for the MT to briefly be both active and pendingFlush
-        sorter.submit(new Runnable()
+        writer.submit(new WrappedRunnable()
         {
-            public void run()
+            public void runMayThrow() throws IOException
             {
-                final List<DecoratedKey> sortedKeys = getSortedKeys();
-                writer.submit(new WrappedRunnable()
-                {
-                    public void runMayThrow() throws IOException
-                    {
-                        ColumnFamilyStore cfs = 
Table.open(table).getColumnFamilyStore(columnfamilyName);
-                        cfs.addSSTable(writeSortedContents(sortedKeys));
-                        
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
-                        condition.signalAll();
-                    }
-                });
+                ColumnFamilyStore cfs = 
Table.open(table).getColumnFamilyStore(columnfamilyName);
+                cfs.addSSTable(writeSortedContents());
+                
ColumnFamilyStore.getMemtablesPendingFlushNotNull(columnfamilyName).remove(Memtable.this);
+                condition.signalAll();
             }
         });
     }
@@ -202,18 +185,9 @@
         return "Memtable(" + columnfamilyName + ")@" + hashCode();
     }
 
-    public Iterator<DecoratedKey> getKeyIterator()
+    public Iterator<DecoratedKey> getKeyIterator(DecoratedKey startWith)
     {
-        // even though we are using NBHM, it is okay to use size() twice here, 
since size() will never decrease
-        // w/in a single memtable's lifetime
-        if (columnFamilies.size() == 0)
-        {
-            // cannot create a PQ of size zero (wtf?)
-            return Arrays.asList(new DecoratedKey[0]).iterator();
-        }
-        PriorityQueue<DecoratedKey> pq = new 
PriorityQueue<DecoratedKey>(columnFamilies.size());
-        pq.addAll(columnFamilies.keySet());
-        return new DestructivePQIterator<DecoratedKey>(pq);
+        return columnFamilies.navigableKeySet().tailSet(startWith).iterator();
     }
 
     public boolean isClean()


Reply via email to