Author: jbellis Date: Tue Mar 1 03:22:46 2011 New Revision: 1075634 URL: http://svn.apache.org/viewvc?rev=1075634&view=rev Log: avoid aquiring (and contending with flush for) flusherlock on each write patch by slebresne; reviewed by jbellis and stuhood for CASSANDRA-1954
Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Modified: cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1075634&r1=1075633&r2=1075634&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Tue Mar 1 03:22:46 2011 @@ -109,7 +109,7 @@ public class ColumnFamilyStore implement private AtomicInteger fileIndexGenerator = new AtomicInteger(0); /* active memtable associated with this ColumnFamilyStore. */ - private Memtable memtable; + private volatile Memtable memtable; private final ConcurrentSkipListMap<ByteBuffer, ColumnFamilyStore> indexedColumns; @@ -681,27 +681,21 @@ public class ColumnFamilyStore implement /** flush the given memtable and swap in a new one for its CFS, if it hasn't been frozen already. threadsafe. */ Future<?> maybeSwitchMemtable(Memtable oldMemtable, final boolean writeCommitLog) { - /* - * If we can get the writelock, that means no new updates can come in and - * all ongoing updates to memtables have completed. We can get the tail - * of the log and use it as the starting position for log replay on recovery. - * - * This is why we Table.flusherLock needs to be global instead of per-Table: - * we need to schedule discardCompletedSegments calls in the same order as their - * contexts (commitlog position) were read, even though the flush executor - * is multithreaded. - */ - Table.flusherLock.writeLock().lock(); + if (oldMemtable.isPendingFlush()) + return null; + + if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null) + return null; // column family was dropped. no point in flushing. + + // Only one thread will succeed in marking it as pending flush; the others can go back to processing writes + if (!oldMemtable.markPendingFlush()) + return null; + + // Table.flusherLock ensures that we schedule discardCompletedSegments calls in the same order as their + // contexts (commitlog position) were read, even though the flush executor is multithreaded. + Table.flusherLock.lock(); try { - if (oldMemtable.isFrozen()) - return null; - - if (DatabaseDescriptor.getCFMetaData(metadata.cfId) == null) - return null; // column family was dropped. no point in flushing. - - assert memtable == oldMemtable; - memtable.freeze(); final CommitLogSegment.CommitLogContext ctx = writeCommitLog ? CommitLog.instance.getContext() : null; logger.info("switching in a fresh Memtable for " + columnFamily + " at " + ctx); @@ -750,7 +744,7 @@ public class ColumnFamilyStore implement } finally { - Table.flusherLock.writeLock().unlock(); + Table.flusherLock.unlock(); if (memtableSwitchCount == Integer.MAX_VALUE) { memtableSwitchCount = 0; @@ -796,8 +790,6 @@ public class ColumnFamilyStore implement /** * Insert/Update the column family for this key. - * Caller is responsible for acquiring Table.flusherLock! - * param @ lock - lock that needs to be used. * param @ key - key for update/insert * param @ columnFamily - columnFamily changes */ @@ -805,14 +797,15 @@ public class ColumnFamilyStore implement { long start = System.nanoTime(); - boolean flushRequested = memtable.isThresholdViolated(); - memtable.put(key, columnFamily); + Memtable mt = getMemtableThreadSafe(); + boolean flushRequested = mt.isThresholdViolated(); + mt.put(key, columnFamily); ColumnFamily cachedRow = getRawCachedRow(key); if (cachedRow != null) cachedRow.addAll(columnFamily); writeStats.addNano(System.nanoTime() - start); - return flushRequested ? memtable : null; + return flushRequested ? mt : null; } /* @@ -1044,26 +1037,15 @@ public class ColumnFamilyStore implement } /** - * get the current memtable in a threadsafe fashion. note that simply "return memtable_" is - * incorrect; you need to lock to introduce a thread safe happens-before ordering. + * get the current memtable in a threadsafe fashion. + * Returning memtable is ok because memtable is volatile, and thus + * introduce a happens-before ordering. * - * do NOT use this method to do either a put or get on the memtable object, since it could be - * flushed in the meantime (and its executor terminated). - * - * also do NOT make this method public or it will really get impossible to reason about these things. - * @return + * do NOT make this method public or it will really get impossible to reason about these things. */ private Memtable getMemtableThreadSafe() { - Table.flusherLock.readLock().lock(); - try - { - return memtable; - } - finally - { - Table.flusherLock.readLock().unlock(); - } + return memtable; } public Collection<SSTableReader> getSSTables() @@ -1106,10 +1088,10 @@ public class ColumnFamilyStore implement return readStats.getTotalLatencyMicros(); } -// TODO this actually isn't a good meature of pending tasks + // TODO this actually isn't a good meature of pending tasks public int getPendingTasks() { - return Table.flusherLock.getQueueLength(); + return 0; } public long getWriteCount() Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1075634&r1=1075633&r2=1075634&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Tue Mar 1 03:22:46 2011 @@ -29,6 +29,7 @@ import java.util.concurrent.ConcurrentNa import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -52,7 +53,8 @@ public class Memtable implements Compara { private static final Logger logger = LoggerFactory.getLogger(Memtable.class); - private boolean isFrozen; + private final AtomicBoolean isPendingFlush = new AtomicBoolean(false); + private final AtomicInteger activeWriters = new AtomicInteger(0); private final AtomicLong currentThroughput = new AtomicLong(0); private final AtomicLong currentOperations = new AtomicLong(0); @@ -105,25 +107,30 @@ public class Memtable implements Compara return currentThroughput.get() >= this.THRESHOLD || currentOperations.get() >= this.THRESHOLD_COUNT; } - boolean isFrozen() + boolean isPendingFlush() { - return isFrozen; + return isPendingFlush.get(); } - void freeze() + boolean markPendingFlush() { - isFrozen = true; + return isPendingFlush.compareAndSet(false, true); } /** * Should only be called by ColumnFamilyStore.apply. NOT a public API. - * (CFS handles locking to avoid submitting an op - * to a flushing memtable. Any other way is unsafe.) */ void put(DecoratedKey key, ColumnFamily columnFamily) { - assert !isFrozen; // not 100% foolproof but hell, it's an assert - resolve(key, columnFamily); + try + { + activeWriters.incrementAndGet(); + resolve(key, columnFamily); + } + finally + { + activeWriters.decrementAndGet(); + } } private void resolve(DecoratedKey key, ColumnFamily cf) @@ -173,6 +180,7 @@ public class Memtable implements Compara { public void runMayThrow() throws IOException { + waitForWriters(); if (!cfs.reverseReadWriteOrder()) { //XXX: race condition: may allow double reconcile; but never misses an MT @@ -190,6 +198,25 @@ public class Memtable implements Compara }); } + /* + * Wait for all writers to be done with this memtable before flushing. + * A busy-wait is probably alright since we'll new wait long. + */ + private void waitForWriters() + { + while (activeWriters.get() > 0) + { + try + { + Thread.sleep(3); + } + catch (InterruptedException e) + { + logger.error("Interrupted while waiting on writers.", e); + } + } + } + public String toString() { return String.format("Memtable-%s@%s(%s bytes, %s operations)", Modified: cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java?rev=1075634&r1=1075633&r2=1075634&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/RowIteratorFactory.java Tue Mar 1 03:22:46 2011 @@ -181,15 +181,7 @@ public class RowIteratorFactory */ private static Iterator<Map.Entry<DecoratedKey, ColumnFamily>> memtableEntryIterator(Memtable memtable, DecoratedKey startWith) { - Table.flusherLock.readLock().lock(); - try - { - return memtable.getEntryIterator(startWith); - } - finally - { - Table.flusherLock.readLock().unlock(); - } + return memtable.getEntryIterator(startWith); } /** Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Table.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=1075634&r1=1075633&r2=1075634&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/db/Table.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Tue Mar 1 03:22:46 2011 @@ -28,7 +28,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Function; import com.google.common.collect.Iterables; @@ -61,12 +61,11 @@ public class Table private static final String SNAPSHOT_SUBDIR_NAME = "snapshots"; /** - * accesses to CFS.memtable should acquire this for thread safety. - * Table.maybeSwitchMemtable should aquire the writeLock; see that method for the full explanation. - * - * (Enabling fairness in the RRWL is observed to decrease throughput, so we leave it off.) + * Table.maybeSwitchMemtable aquires this lock when flushing. + * This is a global lock mainly for the benfits of Migration, so that it + * can block all flushing. */ - static final ReentrantReadWriteLock flusherLock = new ReentrantReadWriteLock(); + static final Lock flusherLock = new ReentrantLock(); // It is possible to call Table.open without a running daemon, so it makes sense to ensure // proper directories here as well as in CassandraDaemon. @@ -123,7 +122,7 @@ public class Table public static Lock getFlushLock() { - return flusherLock.writeLock(); + return flusherLock; } public static Table clear(String table) throws IOException @@ -357,72 +356,64 @@ public class Table logger.debug("applying mutation of row {}", ByteBufferUtil.bytesToHex(mutation.key())); // write the mutation to the commitlog and memtables - flusherLock.readLock().lock(); - try + if (writeCommitLog) + CommitLog.instance.add(mutation); + + DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key()); + for (ColumnFamily cf : mutation.getColumnFamilies()) { - if (writeCommitLog) - CommitLog.instance.add(mutation); - - DecoratedKey<?> key = StorageService.getPartitioner().decorateKey(mutation.key()); - for (ColumnFamily cf : mutation.getColumnFamilies()) + ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); + if (cfs == null) { - ColumnFamilyStore cfs = columnFamilyStores.get(cf.id()); - if (cfs == null) - { - logger.error("Attempting to mutate non-existant column family " + cf.id()); - continue; - } + logger.error("Attempting to mutate non-existant column family " + cf.id()); + continue; + } - SortedSet<ByteBuffer> mutatedIndexedColumns = null; - for (ByteBuffer column : cfs.getIndexedColumns()) + SortedSet<ByteBuffer> mutatedIndexedColumns = null; + for (ByteBuffer column : cfs.getIndexedColumns()) + { + if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete()) { - if (cf.getColumnNames().contains(column) || cf.isMarkedForDelete()) + if (mutatedIndexedColumns == null) + mutatedIndexedColumns = new TreeSet<ByteBuffer>(); + mutatedIndexedColumns.add(column); + if (logger.isDebugEnabled()) { - if (mutatedIndexedColumns == null) - mutatedIndexedColumns = new TreeSet<ByteBuffer>(); - mutatedIndexedColumns.add(column); - if (logger.isDebugEnabled()) - { - // can't actually use validator to print value here, because we overload value - // for deletion timestamp as well (which may not be a well-formed value for the column type) - ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion - logger.debug(String.format("mutating indexed column %s value %s", - cf.getComparator().getString(column), - value == null ? "null" : ByteBufferUtil.bytesToHex(value))); - } + // can't actually use validator to print value here, because we overload value + // for deletion timestamp as well (which may not be a well-formed value for the column type) + ByteBuffer value = cf.getColumn(column) == null ? null : cf.getColumn(column).value(); // may be null on row-level deletion + logger.debug(String.format("mutating indexed column %s value %s", + cf.getComparator().getString(column), + value == null ? "null" : ByteBufferUtil.bytesToHex(value))); } } + } - synchronized (indexLockFor(mutation.key())) + synchronized (indexLockFor(mutation.key())) + { + ColumnFamily oldIndexedColumns = null; + if (mutatedIndexedColumns != null) { - ColumnFamily oldIndexedColumns = null; - if (mutatedIndexedColumns != null) - { - // with the raw data CF, we can just apply every update in any order and let - // read-time resolution throw out obsolete versions, thus avoiding read-before-write. - // but for indexed data we need to make sure that we're not creating index entries - // for obsolete writes. - oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); - logger.debug("Pre-mutation index row is {}", oldIndexedColumns); - ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); - } + // with the raw data CF, we can just apply every update in any order and let + // read-time resolution throw out obsolete versions, thus avoiding read-before-write. + // but for indexed data we need to make sure that we're not creating index entries + // for obsolete writes. + oldIndexedColumns = readCurrentIndexedColumns(key, cfs, mutatedIndexedColumns); + logger.debug("Pre-mutation index row is {}", oldIndexedColumns); + ignoreObsoleteMutations(cf, mutatedIndexedColumns, oldIndexedColumns); + } - Memtable fullMemtable = cfs.apply(key, cf); - if (fullMemtable != null) - memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); + Memtable fullMemtable = cfs.apply(key, cf); + if (fullMemtable != null) + memtablesToFlush = addFullMemtable(memtablesToFlush, fullMemtable); - if (mutatedIndexedColumns != null) - { - // ignore full index memtables -- we flush those when the "master" one is full - applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns); - } + if (mutatedIndexedColumns != null) + { + // ignore full index memtables -- we flush those when the "master" one is full + applyIndexUpdates(mutation.key(), cf, cfs, mutatedIndexedColumns, oldIndexedColumns); } } } - finally - { - flusherLock.readLock().unlock(); - } // flush memtables that got filled up outside the readlock (maybeSwitchMemtable acquires writeLock). // usually mTF will be empty and this will be a no-op. @@ -576,19 +567,12 @@ public class Table DecoratedKey<?> key = iter.next(); logger.debug("Indexing row {} ", key); List<Memtable> memtablesToFlush = Collections.emptyList(); - flusherLock.readLock().lock(); - try - { - synchronized (indexLockFor(key.key)) - { - ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); - if (cf != null) - memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null); - } - } - finally + + synchronized (indexLockFor(key.key)) { - flusherLock.readLock().unlock(); + ColumnFamily cf = readCurrentIndexedColumns(key, cfs, columns); + if (cf != null) + memtablesToFlush = applyIndexUpdates(key.key, cf, cfs, cf.getColumnNames(), null); } // during index build, we do flush index memtables separately from master; otherwise we could OOM