Author: jbellis
Date: Fri Apr 1 22:22:30 2011
New Revision: 1087919
URL: http://svn.apache.org/viewvc?rev=1087919&view=rev
Log:
revert #1954 for now
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Fri
Apr 1 22:22:30 2011
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.cache.AutoSavingCache;
import org.apache.cassandra.cache.AutoSavingKeyCache;
import org.apache.cassandra.cache.AutoSavingRowCache;
-import org.apache.cassandra.cache.JMXInstrumentedCache;
import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.concurrent.StageManager;
@@ -588,18 +587,33 @@ public class ColumnFamilyStore implement
/** flush the given memtable and swap in a new one for its CFS, if it
hasn't been frozen already. threadsafe. */
Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean
writeCommitLog)
{
- // Only one thread will succeed in marking it as pending flush; the
others can go back to processing writes
- if (!oldMemtable.markPendingFlush())
+ if (oldMemtable.isFrozen())
{
- logger.debug("memtable is already pending flush; another thread
must be flushing it");
+ logger.debug("memtable is already frozen; another thread must be
flushing it");
return null;
}
- assert getMemtableThreadSafe() == oldMemtable;
- // global synchronization ensures that we schedule
discardCompletedSegments calls in the same order as their
- // contexts (commitlog position) were read, even though the flush
executor is multithreaded.
- synchronized (ColumnFamilyStore.class)
+ /*
+ * If we can get the writelock, that means no new updates can come in
and
+ * all ongoing updates to memtables have completed. We can get the tail
+ * of the log and use it as the starting position for log replay on
recovery.
+ *
+ * This is why we Table.flusherLock needs to be global instead of
per-Table:
+ * we need to schedule discardCompletedSegments calls in the same
order as their
+ * contexts (commitlog position) were read, even though the flush
executor
+ * is multithreaded.
+ */
+ Table.switchLock.writeLock().lock();
+ try
{
+ if (oldMemtable.isFrozen())
+ {
+ logger.debug("memtable is already frozen; another thread must
be flushing it");
+ return null;
+ }
+
+ assert getMemtableThreadSafe() == oldMemtable;
+ oldMemtable.freeze();
final CommitLogSegment.CommitLogContext ctx = writeCommitLog ?
CommitLog.instance.getContext() : null;
// submit the memtable for any indexed sub-cfses, and our own.
@@ -612,9 +626,8 @@ public class ColumnFamilyStore implement
}
final CountDownLatch latch = new CountDownLatch(icc.size());
for (ColumnFamilyStore cfs : icc)
- {
submitFlush(cfs.data.switchMemtable(), latch);
- }
+
// we marked our memtable as frozen as part of the concurrency
control,
// so even if there was nothing to flush we need to switch it out
if (!icc.contains(this))
@@ -641,6 +654,10 @@ public class ColumnFamilyStore implement
}
});
}
+ finally
+ {
+ Table.switchLock.writeLock().unlock();
+ }
}
public boolean isDropped()
@@ -696,6 +713,8 @@ public class ColumnFamilyStore implement
/**
* Insert/Update the column family for this key.
+ * Caller is responsible for acquiring Table.flusherLock!
+ * param @ lock - lock that needs to be used.
* param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
*/
@@ -947,11 +966,14 @@ public class ColumnFamilyStore implement
}
/**
- * get the current memtable in a threadsafe fashion.
- * Returning memtable is ok because memtable is volatile, and thus
- * introduce a happens-before ordering.
+ * get the current memtable in a threadsafe fashion. note that simply
"return memtable_" is
+ * incorrect; you need to lock to introduce a thread safe happens-before
ordering.
+ *
+ * do NOT use this method to do either a put or get on the memtable
object, since it could be
+ * flushed in the meantime (and its executor terminated).
*
- * do NOT make this method public or it will really get impossible to
reason about these things.
+ * also do NOT make this method public or it will really get impossible to
reason about these things.
+ * @return
*/
private Memtable getMemtableThreadSafe()
{
@@ -998,10 +1020,10 @@ public class ColumnFamilyStore implement
return readStats.getTotalLatencyMicros();
}
- // TODO this actually isn't a good meature of pending tasks
+// TODO this actually isn't a good meature of pending tasks
public int getPendingTasks()
{
- return 0;
+ return Table.switchLock.getQueueLength();
}
public long getWriteCount()
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Fri Apr 1
22:22:30 2011
@@ -52,8 +52,7 @@ public class Memtable implements Compara
{
private static final Logger logger =
LoggerFactory.getLogger(Memtable.class);
- private final AtomicBoolean isPendingFlush = new AtomicBoolean(false);
- private final AtomicInteger activeWriters = new AtomicInteger(0);
+ private boolean isFrozen;
private final AtomicLong currentThroughput = new AtomicLong(0);
private final AtomicLong currentOperations = new AtomicLong(0);
@@ -106,30 +105,25 @@ public class Memtable implements Compara
return currentThroughput.get() >= this.THRESHOLD ||
currentOperations.get() >= this.THRESHOLD_COUNT;
}
- boolean isPendingFlush()
+ boolean isFrozen()
{
- return isPendingFlush.get();
+ return isFrozen;
}
- boolean markPendingFlush()
+ void freeze()
{
- return isPendingFlush.compareAndSet(false, true);
+ isFrozen = true;
}
/**
* Should only be called by ColumnFamilyStore.apply. NOT a public API.
+ * (CFS handles locking to avoid submitting an op
+ * to a flushing memtable. Any other way is unsafe.)
*/
void put(DecoratedKey key, ColumnFamily columnFamily)
{
- try
- {
- activeWriters.incrementAndGet();
- resolve(key, columnFamily);
- }
- finally
- {
- activeWriters.decrementAndGet();
- }
+ assert !isFrozen; // not 100% foolproof but hell, it's an assert
+ resolve(key, columnFamily);
}
private void resolve(DecoratedKey key, ColumnFamily cf)
@@ -178,7 +172,6 @@ public class Memtable implements Compara
{
public void runMayThrow() throws IOException
{
- waitForWriters();
cfs.flushLock.lock();
try
{
@@ -197,25 +190,6 @@ public class Memtable implements Compara
});
}
- /*
- * Wait for all writers to be done with this memtable before flushing.
- * A busy-wait is probably alright since we'll new wait long.
- */
- private void waitForWriters()
- {
- while (activeWriters.get() > 0)
- {
- try
- {
- Thread.sleep(3);
- }
- catch (InterruptedException e)
- {
- logger.error("Interrupted while waiting on writers.", e);
- }
- }
- }
-
public String toString()
{
return String.format("Memtable-%s@%s(%s bytes, %s operations)",
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1087919&r1=1087918&r2=1087919&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri Apr 1
22:22:30 2011
@@ -24,7 +24,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.*;
-import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
@@ -57,6 +57,14 @@ public class Table
private static final Logger logger = LoggerFactory.getLogger(Table.class);
private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
+ /**
+ * accesses to CFS.memtable should acquire this for thread safety.
+ * Table.maybeSwitchMemtable should aquire the writeLock; see that method
for the full explanation.
+ *
+ * (Enabling fairness in the RRWL is observed to decrease throughput, so
we leave it off.)
+ */
+ static final ReentrantReadWriteLock switchLock = new
ReentrantReadWriteLock();
+
// It is possible to call Table.open without a running daemon, so it makes
sense to ensure
// proper directories here as well as in CassandraDaemon.
static
@@ -371,64 +379,72 @@ public class Table
logger.debug("applying mutation of row {}",
ByteBufferUtil.bytesToHex(mutation.key()));
// write the mutation to the commitlog and memtables
- if (writeCommitLog)
- CommitLog.instance.add(mutation);
-
- DecoratedKey<?> key =
StorageService.getPartitioner().decorateKey(mutation.key());
- for (ColumnFamily cf : mutation.getColumnFamilies())
+ switchLock.readLock().lock();
+ try
{
- ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
- if (cfs == null)
+ if (writeCommitLog)
+ CommitLog.instance.add(mutation);
+
+ DecoratedKey<?> key =
StorageService.getPartitioner().decorateKey(mutation.key());
+ for (ColumnFamily cf : mutation.getColumnFamilies())
{
- logger.error("Attempting to mutate non-existant column family
" + cf.id());
- continue;
- }
+ ColumnFamilyStore cfs = columnFamilyStores.get(cf.id());
+ if (cfs == null)
+ {
+ logger.error("Attempting to mutate non-existant column
family " + cf.id());
+ continue;
+ }
- SortedSet<ByteBuffer> mutatedIndexedColumns = null;
- for (ByteBuffer column : cfs.getIndexedColumns())
- {
- if (cf.getColumnNames().contains(column) ||
cf.isMarkedForDelete())
+ SortedSet<ByteBuffer> mutatedIndexedColumns = null;
+ for (ByteBuffer column : cfs.getIndexedColumns())
{
- if (mutatedIndexedColumns == null)
- mutatedIndexedColumns = new TreeSet<ByteBuffer>();
- mutatedIndexedColumns.add(column);
- if (logger.isDebugEnabled())
+ if (cf.getColumnNames().contains(column) ||
cf.isMarkedForDelete())
{
- // can't actually use validator to print value here,
because we overload value
- // for deletion timestamp as well (which may not be a
well-formed value for the column type)
- ByteBuffer value = cf.getColumn(column) == null ? null
: cf.getColumn(column).value(); // may be null on row-level deletion
- logger.debug(String.format("mutating indexed column %s
value %s",
- cf.getComparator().getString(column),
- value == null ? "null" :
ByteBufferUtil.bytesToHex(value)));
+ if (mutatedIndexedColumns == null)
+ mutatedIndexedColumns = new TreeSet<ByteBuffer>();
+ mutatedIndexedColumns.add(column);
+ if (logger.isDebugEnabled())
+ {
+ // can't actually use validator to print value
here, because we overload value
+ // for deletion timestamp as well (which may not
be a well-formed value for the column type)
+ ByteBuffer value = cf.getColumn(column) == null ?
null : cf.getColumn(column).value(); // may be null on row-level deletion
+ logger.debug(String.format("mutating indexed
column %s value %s",
+
cf.getComparator().getString(column),
+ value == null ? "null"
: ByteBufferUtil.bytesToHex(value)));
+ }
}
}
- }
- synchronized (indexLockFor(mutation.key()))
- {
- ColumnFamily oldIndexedColumns = null;
- if (mutatedIndexedColumns != null)
+ synchronized (indexLockFor(mutation.key()))
{
- // with the raw data CF, we can just apply every update in
any order and let
- // read-time resolution throw out obsolete versions, thus
avoiding read-before-write.
- // but for indexed data we need to make sure that we're
not creating index entries
- // for obsolete writes.
- oldIndexedColumns = readCurrentIndexedColumns(key, cfs,
mutatedIndexedColumns);
- logger.debug("Pre-mutation index row is {}",
oldIndexedColumns);
- ignoreObsoleteMutations(cf, mutatedIndexedColumns,
oldIndexedColumns);
- }
+ ColumnFamily oldIndexedColumns = null;
+ if (mutatedIndexedColumns != null)
+ {
+ // with the raw data CF, we can just apply every
update in any order and let
+ // read-time resolution throw out obsolete versions,
thus avoiding read-before-write.
+ // but for indexed data we need to make sure that
we're not creating index entries
+ // for obsolete writes.
+ oldIndexedColumns = readCurrentIndexedColumns(key,
cfs, mutatedIndexedColumns);
+ logger.debug("Pre-mutation index row is {}",
oldIndexedColumns);
+ ignoreObsoleteMutations(cf, mutatedIndexedColumns,
oldIndexedColumns);
+ }
- Memtable fullMemtable = cfs.apply(key, cf);
- if (fullMemtable != null)
- memtablesToFlush = addFullMemtable(memtablesToFlush,
fullMemtable);
+ Memtable fullMemtable = cfs.apply(key, cf);
+ if (fullMemtable != null)
+ memtablesToFlush = addFullMemtable(memtablesToFlush,
fullMemtable);
- if (mutatedIndexedColumns != null)
- {
- // ignore full index memtables -- we flush those when the
"master" one is full
- applyIndexUpdates(mutation.key(), cf, cfs,
mutatedIndexedColumns, oldIndexedColumns);
+ if (mutatedIndexedColumns != null)
+ {
+ // ignore full index memtables -- we flush those when
the "master" one is full
+ applyIndexUpdates(mutation.key(), cf, cfs,
mutatedIndexedColumns, oldIndexedColumns);
+ }
}
}
}
+ finally
+ {
+ switchLock.readLock().unlock();
+ }
// flush memtables that got filled up outside the readlock
(maybeSwitchMemtable acquires writeLock).
// usually mTF will be empty and this will be a no-op.
@@ -582,12 +598,19 @@ public class Table
DecoratedKey<?> key = iter.next();
logger.debug("Indexing row {} ", key);
List<Memtable> memtablesToFlush = Collections.emptyList();
-
- synchronized (indexLockFor(key.key))
+ switchLock.readLock().lock();
+ try
+ {
+ synchronized (indexLockFor(key.key))
+ {
+ ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
+ if (cf != null)
+ memtablesToFlush = applyIndexUpdates(key.key, cf,
cfs, cf.getColumnNames(), null);
+ }
+ }
+ finally
{
- ColumnFamily cf = readCurrentIndexedColumns(key, cfs,
columns);
- if (cf != null)
- memtablesToFlush = applyIndexUpdates(key.key, cf, cfs,
cf.getColumnNames(), null);
+ switchLock.readLock().unlock();
}
// during index build, we do flush index memtables separately
from master; otherwise we could OOM