Author: junrao
Date: Fri Sep 18 03:20:57 2009
New Revision: 816453
URL: http://svn.apache.org/viewvc?rev=816453&view=rev
Log:
commitlog may consider writes flushed, that are not yet; patched by junrao;
reviewed by jbellis for CASSANDRA-445
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
---
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Fri Sep 18 03:20:57 2009
@@ -322,11 +322,28 @@
columnFamily_, SSTable.TEMPFILE_MARKER,
fileIndexGenerator_.incrementAndGet());
}
- void switchMemtable(Memtable oldMemtable, CommitLog.CommitLogContext ctx)
+ void switchMemtable(Memtable oldMemtable)
{
- memtableLock_.writeLock().lock();
+ CommitLog.CommitLogContext ctx = null;
+ /**
+ * If we can get the writelock, that means no new updates can come in
and
+ * all ongoing updates to memtables have completed. We can get the
tail
+ * of the log and use it as the starting position for log replay on
recovery.
+ *
+ * By holding the flusherLock_, we don't need the memetableLock any
more.
+ */
+ Table.flusherLock_.writeLock().lock();
try
{
+ try
+ {
+ ctx = CommitLog.open().getContext();
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
if (oldMemtable.isFrozen())
{
return;
@@ -336,17 +353,17 @@
getMemtablesPendingFlushNotNull(columnFamily_).add(oldMemtable);
// it's ok for the MT to briefly be both active and pendingFlush
submitFlush(oldMemtable, ctx);
memtable_ = new Memtable(table_, columnFamily_);
+
+ if (memtableSwitchCount == Integer.MAX_VALUE)
+ {
+ memtableSwitchCount = 0;
+ }
+ memtableSwitchCount++;
}
finally
{
- memtableLock_.writeLock().unlock();
- }
-
- if (memtableSwitchCount == Integer.MAX_VALUE)
- {
- memtableSwitchCount = 0;
+ Table.flusherLock_.writeLock().unlock();
}
- memtableSwitchCount++;
}
void switchBinaryMemtable(String key, byte[] buffer) throws IOException
@@ -360,16 +377,7 @@
if (memtable_.isClean())
return;
- CommitLog.CommitLogContext ctx = null;
- try
- {
- ctx = CommitLog.open().getContext();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- switchMemtable(memtable_, ctx);
+ switchMemtable(memtable_);
}
void forceBlockingFlush() throws IOException, ExecutionException,
InterruptedException
@@ -402,25 +410,30 @@
* param @ key - key for update/insert
* param @ columnFamily - columnFamily changes
*/
- void apply(String key, ColumnFamily columnFamily,
CommitLog.CommitLogContext cLogCtx)
+ Memtable apply(String key, ColumnFamily columnFamily)
throws IOException
{
long start = System.currentTimeMillis();
Memtable initialMemtable = getMemtableThreadSafe();
+ boolean isFlush = false;
+
if (initialMemtable.isThresholdViolated())
{
- switchMemtable(initialMemtable, cLogCtx);
+ isFlush = true;
}
+
memtableLock_.writeLock().lock();
try
{
- memtable_.put(key, columnFamily);
+ initialMemtable.put(key, columnFamily);
}
finally
{
memtableLock_.writeLock().unlock();
}
writeStats_.add(System.currentTimeMillis() - start);
+
+ return isFlush ? initialMemtable : null;
}
/*
Modified:
incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/CommitLog.java
Fri Sep 18 03:20:57 2009
@@ -268,10 +268,11 @@
logWriter_.seek(currentPos);
}
- private static void writeCommitLogHeader(RandomAccessFile logWriter,
byte[] bytes) throws IOException
+ private static void writeCommitLogHeader(BufferedRandomAccessFile
logWriter, byte[] bytes) throws IOException
{
logWriter.writeLong(bytes.length);
logWriter.write(bytes);
+ logWriter.sync();
}
void recover(File[] clogs) throws IOException
@@ -515,7 +516,7 @@
}
else
{
- RandomAccessFile logWriter =
CommitLog.createWriter(oldFile);
+ BufferedRandomAccessFile logWriter =
CommitLog.createWriter(oldFile);
writeCommitLogHeader(logWriter,
oldCommitLogHeader.toByteArray());
logWriter.close();
}
Modified: incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java?rev=816453&r1=816452&r2=816453&view=diff
==============================================================================
--- incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java
(original)
+++ incubator/cassandra/trunk/src/java/org/apache/cassandra/db/Table.java Fri
Sep 18 03:20:57 2009
@@ -23,6 +23,7 @@
import java.io.File;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.dht.BootstrapInitiateMessage;
@@ -48,6 +49,8 @@
private static Logger logger_ = Logger.getLogger(Table.class);
private static final String SNAPSHOT_SUBDIR_NAME = "snapshots";
+ /* we use this lock to drain updaters before calling a flush. */
+ static final ReentrantReadWriteLock flusherLock_ = new
ReentrantReadWriteLock(true);
/*
* This class represents the metadata of this Table. The metadata
@@ -590,12 +593,30 @@
*/
void apply(Row row) throws IOException
{
- CommitLog.CommitLogContext cLogCtx = CommitLog.open().add(row);
+ HashMap<ColumnFamilyStore,Memtable> memtablesToFlush = new
HashMap<ColumnFamilyStore, Memtable>();
- for (ColumnFamily columnFamily : row.getColumnFamilies())
+ flusherLock_.readLock().lock();
+ try
{
- ColumnFamilyStore cfStore =
columnFamilyStores_.get(columnFamily.name());
- cfStore.apply(row.key(), columnFamily, cLogCtx);
+ CommitLog.open().add(row);
+
+ for (ColumnFamily columnFamily : row.getColumnFamilies())
+ {
+ Memtable memtableToFlush;
+ ColumnFamilyStore cfStore =
columnFamilyStores_.get(columnFamily.name());
+ if ( (memtableToFlush=cfStore.apply(row.key(), columnFamily))
!= null)
+ memtablesToFlush.put(cfStore, memtableToFlush);
+ }
+ }
+ finally
+ {
+ flusherLock_.readLock().unlock();
+ }
+
+ if (memtablesToFlush.size() > 0)
+ {
+ for (Map.Entry<ColumnFamilyStore, Memtable> entry :
memtablesToFlush.entrySet())
+ entry.getKey().switchMemtable(entry.getValue());
}
}