Author: jbellis
Date: Thu Aug 11 19:36:06 2011
New Revision: 1156763
URL: http://svn.apache.org/viewvc?rev=1156763&view=rev
Log:
make sure truncate clears out the commitlog
patch by jbellis; reviewed by slebresne for CASSANDRA-2950
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1156763&r1=1156762&r2=1156763&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Thu Aug 11 19:36:06 2011
@@ -1,6 +1,8 @@
0.8.5
* fix NPE when encryption_options is unspecified (CASSANDRA-3007)
* include column name in validation failure exceptions (CASSANDRA-2849)
+ * make sure truncate clears out the commitlog so replay won't re-
+ populate with truncated data (CASSANDRA-2950)
0.8.4
@@ -11,6 +13,7 @@
* switch back to only logging recent dropped messages (CASSANDRA-3004)
* always deserialize RowMutation for counters (CASSANDRA-3006)
* ignore saved replication_factor strategy_option for NTS (CASSANDRA-3011)
+ * make sure pre-truncate CL segments are discarded (CASSANDRA-2950)
0.8.3
@@ -206,7 +209,7 @@
* Disable compaction throttling during bootstrap (CASSANDRA-2612)
* fix CQL treatment of > and < operators in range slices (CASSANDRA-2592)
* fix potential double-application of counter updates on commitlog replay
- (CASSANDRA-2419)
+ by moving replay position from header to sstable metadata (CASSANDRA-2419)
* JDBC CQL driver exposes getColumn for access to timestamp
* JDBC ResultSetMetadata properties added to AbstractType
* r/m clustertool (CASSANDRA-2607)
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1156763&r1=1156762&r2=1156763&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Thu Aug 11 19:36:06 2011
@@ -671,6 +671,7 @@ public class ColumnFamilyStore implement
assert getMemtableThreadSafe() == oldMemtable;
oldMemtable.freeze();
final ReplayPosition ctx = writeCommitLog ?
CommitLog.instance.getContext() : ReplayPosition.NONE;
+ logger.debug("flush position is {}", ctx);
// submit the memtable for any indexed sub-cfses, and our own.
List<ColumnFamilyStore> icc = new
ArrayList<ColumnFamilyStore>(indexedColumns.size());
@@ -1838,6 +1839,37 @@ public class ColumnFamilyStore implement
}
/**
+ * Waits for flushes started BEFORE THIS METHOD IS CALLED to finish.
+ * Does NOT guarantee that no flush is active when it returns.
+ */
+ private void waitForActiveFlushes()
+ {
+ Future<?> future;
+ Table.switchLock.writeLock().lock();
+ try
+ {
+ future = postFlushExecutor.submit(new Runnable() { public void
run() { } });
+ }
+ finally
+ {
+ Table.switchLock.writeLock().unlock();
+ }
+
+ try
+ {
+ future.get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new AssertionError(e);
+ }
+ }
+
+ /**
* Truncate practically deletes the entire column family's data
* @return a Future to the delete operation. Call the future's get() to
make
* sure the column family has been deleted
@@ -1850,14 +1882,27 @@ public class ColumnFamilyStore implement
// We accomplish this by first flushing manually, then snapshotting,
and
// recording the timestamp IN BETWEEN those actions. Any sstables
created
// with this timestamp or greater time, will not be marked for delete.
- try
- {
- forceBlockingFlush();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ //
+ // Bonus complication: since we store replay position in sstable
metadata,
+ // truncating those sstables means we will replay any CL segments from
the
+ // beginning if we restart before they are discarded for normal reasons
+ // post-truncate. So we need to (a) force a new segment so the
currently
+ // active one can be discarded, and (b) flush *all* CFs so that
unflushed
+ // data in others don't keep any pre-truncate CL segments alive.
+ //
+ // Bonus bonus: simply forceFlush of all the CF is not enough, because
if
+ // for a given column family the memtable is clean, forceFlush will
return
+ // immediately, even though there could be a memtable being flush at
the same
+ // time. So to guarantee that all segments can be cleaned out, we need
+ // "waitForActiveFlushes" after the new segment has been created.
+ CommitLog.instance.forceNewSegment();
+ ReplayPosition position = CommitLog.instance.getContext();
+ for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
+ cfs.forceFlush();
+ waitForActiveFlushes();
+ // if everything was clean, flush won't have called discard
+ CommitLog.instance.discardCompletedSegments(metadata.cfId, position);
+
// sleep a little to make sure that our truncatedAt comes after any
sstable
// that was part of the flushed we forced; otherwise on a tie, it
won't get deleted.
try
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java?rev=1156763&r1=1156762&r2=1156763&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/SystemTable.java
Thu Aug 11 19:36:06 2011
@@ -81,9 +81,14 @@ public class SystemTable
ColumnFamily cf =
table.getColumnFamilyStore(STATUS_CF).getColumnFamily(dotSeven);
if (cf == null)
{
- // upgrading from 0.6 to 0.7.
- logger.info("Upgrading to 0.7. Purging hints if there are any. Old
hints will be snapshotted.");
- new Truncation(Table.SYSTEM_TABLE,
HintedHandOffManager.HINTS_CF).apply();
+ // 0.7+ marker not found. Remove hints and add the marker.
+ ColumnFamilyStore hintsCfs =
Table.open(Table.SYSTEM_TABLE).getColumnFamilyStore(HintedHandOffManager.HINTS_CF);
+ if (hintsCfs.getSSTables().size() > 0)
+ {
+ logger.info("Possible 0.6-format hints found. Snapshotting as
'old-hints' and purging");
+ hintsCfs.snapshot("old-hints");
+ hintsCfs.removeAllSSTables();
+ }
RowMutation rm = new RowMutation(Table.SYSTEM_TABLE, COOKIE_KEY);
rm.add(new QueryPath(STATUS_CF, null, hintsPurged6to7),
ByteBufferUtil.bytes("oh yes, it they were purged."),
System.currentTimeMillis());
rm.apply();
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1156763&r1=1156762&r2=1156763&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Thu Aug 11 19:36:06 2011
@@ -486,12 +486,42 @@ public class CommitLog
}
}
-
void sync() throws IOException
{
currentSegment().sync();
}
+ public void forceNewSegment()
+ {
+ Callable<?> task = new Callable()
+ {
+ public Object call() throws IOException
+ {
+ createNewSegment();
+ return null;
+ }
+ };
+
+ try
+ {
+ executor.submit(task).get();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError(e);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private void createNewSegment() throws IOException
+ {
+ sync();
+ segments.add(new CommitLogSegment());
+ }
+
// TODO this should be a Runnable since it doesn't actually return
anything, but it's difficult to do that
// without breaking the fragile CheaterFutureTask in BatchCLES.
class LogRecordAdder implements Callable, Runnable
@@ -510,10 +540,7 @@ public class CommitLog
currentSegment().write(rowMutation);
// roll log if necessary
if (currentSegment().length() >= segmentSize)
- {
- sync();
- segments.add(new CommitLogSegment());
- }
+ createNewSegment();
}
catch (IOException e)
{
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1156763&r1=1156762&r2=1156763&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
Thu Aug 11 19:36:06 2011
@@ -73,6 +73,7 @@ public class RecoveryManagerTruncateTest
rm.apply();
cfs.forceBlockingFlush();
cfs.truncate().get();
+ CommitLog.instance.resetUnsafe();
CommitLog.recover();
assertNull(getFromTable(table, "Standard1", "keymulti",
"col1"));
}