Author: jbellis
Date: Tue May 24 17:43:32 2011
New Revision: 1127179
URL: http://svn.apache.org/viewvc?rev=1127179&view=rev
Log:
revert incomplete BMT-excision
Added:
cassandra/trunk/examples/bmt/
- copied from r1127164, cassandra/trunk/examples/bmt/
cassandra/trunk/examples/bmt/CassandraBulkLoader.java
- copied unchanged from r1127164,
cassandra/trunk/examples/bmt/CassandraBulkLoader.java
cassandra/trunk/examples/bmt/README.txt
- copied unchanged from r1127164, cassandra/trunk/examples/bmt/README.txt
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
- copied unchanged from r1127164,
cassandra/trunk/src/java/org/apache/cassandra/db/BinaryMemtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
- copied unchanged from r1127164,
cassandra/trunk/src/java/org/apache/cassandra/db/IFlushable.java
cassandra/trunk/src/java/org/apache/cassandra/tools/GetVersion.java
- copied unchanged from r1126728,
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/tools/GetVersion.java
Modified:
cassandra/trunk/ (props changed)
cassandra/trunk/contrib/ (props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
(props changed)
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
(props changed)
cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
Propchange: cassandra/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7:1026516-1127143
/cassandra/branches/cassandra-0.7.0:1053690-1055654
-/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0:1125021-1127038
/cassandra/branches/cassandra-0.8.1:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3:1051699-1053689
Propchange: cassandra/trunk/contrib/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/contrib:922689-1052356,1052358-1053452,1053454,1053456-1068009
/cassandra/branches/cassandra-0.7/contrib:1026516-1127143
/cassandra/branches/cassandra-0.7.0/contrib:1053690-1055654
-/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/contrib:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/contrib:1125021-1127038
/cassandra/branches/cassandra-0.8.1/contrib:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/contrib:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1026516-1127143
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1125021-1127038
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Cassandra.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1026516-1127143
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1125021-1127038
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/Column.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1026516-1127143
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1125021-1127038
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/InvalidRequestException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1026516-1127143
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1125021-1127038
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/NotFoundException.java:1051699-1053689
Propchange:
cassandra/trunk/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 24 17:43:32 2011
@@ -1,7 +1,7 @@
/cassandra/branches/cassandra-0.6/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:922689-1052356,1052358-1053452,1053454,1053456-1081914,1083000
/cassandra/branches/cassandra-0.7/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1026516-1127143
/cassandra/branches/cassandra-0.7.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1053690-1055654
-/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1127170
+/cassandra/branches/cassandra-0.8/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1090934-1125013,1125019-1127174
/cassandra/branches/cassandra-0.8.0/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1125021-1127038
/cassandra/branches/cassandra-0.8.1/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1101014-1125018
/cassandra/tags/cassandra-0.7.0-rc3/interface/thrift/gen-java/org/apache/cassandra/thrift/SuperColumn.java:1051699-1053689
Modified: cassandra/trunk/src/java/org/apache/cassandra/config/Config.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/Config.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/config/Config.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/config/Config.java Tue May 24
17:43:32 2011
@@ -77,6 +77,8 @@ public class Config
public Boolean snapshot_before_compaction = false;
public Integer compaction_thread_priority = Thread.MIN_PRIORITY;
+ public Integer binary_memtable_throughput_in_mb = 256;
+
/* if the size of columns or super-columns are more than this, indexing
will kick in */
public Integer column_index_size_in_kb = 64;
public Integer in_memory_compaction_limit_in_mb = 256;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Tue May 24 17:43:32 2011
@@ -937,6 +937,11 @@ public class DatabaseDescriptor
return conf.sliced_buffer_size_in_kb;
}
+ public static int getBMTThreshold()
+ {
+ return conf.binary_memtable_throughput_in_mb;
+ }
+
public static int getCompactionThreadPriority()
{
return conf.compaction_thread_priority;
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue
May 24 17:43:32 2011
@@ -25,6 +25,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
@@ -71,10 +72,13 @@ public class ColumnFamilyStore implement
private static Logger logger =
LoggerFactory.getLogger(ColumnFamilyStore.class);
/*
- * maybeSwitchMemtable puts Memtable.getSortedContents on the writer
executor. When the write is complete,
+ * submitFlush first puts [Binary]Memtable.getSortedContents on the
flushSorter executor,
+ * which then puts the sorted results on the writer executor. This is
because sorting is CPU-bound,
+ * and writing is disk-bound; we want to be able to do both at once. When
the write is complete,
* we turn the writer into an SSTableReader and add it to ssTables_ where
it is available for reads.
*
- * There are two other things that maybeSwitchMemtable does.
+ * For BinaryMemtable that's about all that happens. For live Memtables
there are two other things
+ * that switchMemtable does (which should be the only caller of
submitFlush in this case).
* First, it puts the Memtable into memtablesPendingFlush, where it stays
until the flush is complete
* and it's been added as an SSTableReader to ssTables_. Second, it adds
an entry to commitLogUpdater
* that waits for the flush to complete, then calls onMemtableFlush. This
allows multiple flushes
@@ -82,6 +86,13 @@ public class ColumnFamilyStore implement
* which is necessary for replay in case of a restart since CommitLog
assumes that when onMF is
* called, all data up to the given context has been persisted to SSTables.
*/
+ private static final ExecutorService flushSorter
+ = new
JMXEnabledThreadPoolExecutor(Runtime.getRuntime().availableProcessors(),
+ StageManager.KEEPALIVE,
+ TimeUnit.SECONDS,
+ new
LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors()),
+ new
NamedThreadFactory("FlushSorter"),
+ "internal");
private static final ExecutorService flushWriter
= new
JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
@@ -123,6 +134,9 @@ public class ColumnFamilyStore implement
private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore>
indexedColumns;
+ // TODO binarymemtable ops are not threadsafe (do they need to be?)
+ private AtomicReference<BinaryMemtable> binaryMemtable;
+
private LatencyTracker readStats = new LatencyTracker();
private LatencyTracker writeStats = new LatencyTracker();
@@ -243,6 +257,7 @@ public class ColumnFamilyStore implement
this.keyCacheSaveInSeconds = new
DefaultInteger(metadata.getKeyCacheSavePeriodInSeconds());
this.partitioner = partitioner;
fileIndexGenerator.set(generation);
+ binaryMemtable = new AtomicReference<BinaryMemtable>(new
BinaryMemtable(this));
if (logger.isDebugEnabled())
logger.debug("Starting CFS {}", columnFamily);
@@ -643,11 +658,7 @@ public class ColumnFamilyStore implement
}
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
- {
- Memtable memtable = cfs.data.switchMemtable();
- logger.info("Enqueuing flush of {}", memtable);
- memtable.flushAndSignal(latch, flushWriter, ctx);
- }
+ submitFlush(cfs.data.switchMemtable(), latch, ctx);
// we marked our memtable as frozen as part of the concurrency
control,
// so even if there was nothing to flush we need to switch it out
@@ -688,6 +699,12 @@ public class ColumnFamilyStore implement
: DatabaseDescriptor.getCFMetaData(metadata.cfId) == null;
}
+ void switchBinaryMemtable(DecoratedKey key, ByteBuffer buffer)
+ {
+ binaryMemtable.set(new BinaryMemtable(this));
+ binaryMemtable.get().put(key, buffer);
+ }
+
public void forceFlushIfExpired()
{
if (getMemtableThreadSafe().isExpired())
@@ -718,6 +735,14 @@ public class ColumnFamilyStore implement
future.get();
}
+ public void forceFlushBinary()
+ {
+ if (binaryMemtable.get().isClean())
+ return;
+
+ submitFlush(binaryMemtable.get(), new CountDownLatch(1), null);
+ }
+
public void updateRowCache(DecoratedKey key, ColumnFamily columnFamily)
{
if (rowCache.isPutCopying())
@@ -768,6 +793,20 @@ public class ColumnFamilyStore implement
return flushRequested ? mt : null;
}
+ /*
+ * Insert/Update the column family for this key. param @ lock - lock that
+ * Caller is responsible for acquiring Table.flusherLock!
+ * param @ lock - lock that needs to be used.
+ * needs to be used. param @ key - key for update/insert param @
+ * columnFamily - columnFamily changes
+ */
+ void applyBinary(DecoratedKey key, ByteBuffer buffer)
+ {
+ long start = System.nanoTime();
+ binaryMemtable.get().put(key, buffer);
+ writeStats.addNano(System.nanoTime() - start);
+ }
+
public static ColumnFamily removeDeletedCF(ColumnFamily cf, int gcBefore)
{
// in case of a timestamp tie, tombstones get priority over
non-tombstones.
@@ -959,6 +998,21 @@ public class ColumnFamilyStore implement
}
}
+ /**
+ * submits flush sort on the flushSorter executor, which will in turn
submit to flushWriter when sorted.
+ * TODO because our executors use CallerRunsPolicy, when flushSorter fills
up, no writes will proceed
+ * because the next flush will start executing on the caller,
mutation-stage thread that has the
+ * flush write lock held. (writes aquire this as a read lock before
proceeding.)
+ * This is good, because it backpressures flushes, but bad, because we
can't write until that last
+ * flushing thread finishes sorting, which will almost always be longer
than any of the flushSorter threads proper
+ * (since, by definition, it started last).
+ */
+ void submitFlush(IFlushable flushable, CountDownLatch latch,
ReplayPosition context)
+ {
+ logger.info("Enqueuing flush of {}", flushable);
+ flushable.flushAndSignal(latch, flushSorter, flushWriter, context);
+ }
+
public long getMemtableColumnsCount()
{
return getMemtableThreadSafe().getOperations();
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1127179&r1=1127178&r2=1127179&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue May 24
17:43:32 2011
@@ -46,7 +46,7 @@ import org.apache.cassandra.io.sstable.S
import org.apache.cassandra.utils.WrappedRunnable;
import org.github.jamm.MemoryMeter;
-public class Memtable implements Comparable<Memtable>
+public class Memtable implements Comparable<Memtable>, IFlushable
{
private static final Logger logger =
LoggerFactory.getLogger(Memtable.class);
@@ -256,7 +256,7 @@ public class Memtable implements Compara
return ssTable;
}
- public void flushAndSignal(final CountDownLatch latch, ExecutorService
writer, final ReplayPosition context)
+ public void flushAndSignal(final CountDownLatch latch, ExecutorService
sorter, final ExecutorService writer, final ReplayPosition context)
{
writer.execute(new WrappedRunnable()
{