Author: jbellis
Date: Wed Oct 5 18:18:23 2011
New Revision: 1179359
URL: http://svn.apache.org/viewvc?rev=1179359&view=rev
Log:
fix truncate allowing data to be replayed post-restart
patch by jbellis; reviewed by slebresne for CASSANDRA-3297
Modified:
cassandra/branches/cassandra-0.8/CHANGES.txt
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Truncation.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
Modified: cassandra/branches/cassandra-0.8/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/CHANGES.txt?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.8/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.8/CHANGES.txt Wed Oct 5 18:18:23 2011
@@ -1,3 +1,7 @@
+0.8.8
+ * fix truncate allowing data to be replayed post-restart (CASSANDRA-3297)
+
+
0.8.7
* Kill server on wrapped OOME such as from FileChannel.map (CASSANDRA-3201)
* Allow using quotes in "USE <keyspace>;" CLI command (CASSANDRA-3208)
@@ -30,6 +34,7 @@
* Make Pig storage handle implements LoadMetadata (CASSANDRA-2777)
* Improved CLI exceptions (CASSANDRA-3312)
+
0.8.6
* revert CASSANDRA-2388
* change TokenRange.endpoints back to listen/broadcast address to match
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Wed Oct 5 18:18:23 2011
@@ -837,7 +837,7 @@ public class ColumnFamilyStore implement
if (clean)
{
- logger.debug("forceFlush requested but everything is clean");
+ logger.debug("forceFlush requested but everything is clean in {}",
columnFamily);
return null;
}
@@ -1966,7 +1966,7 @@ public class ColumnFamilyStore implement
* @return a Future to the delete operation. Call the future's get() to
make
* sure the column family has been deleted
*/
- public Future<?> truncate() throws IOException
+ public Future<?> truncate() throws IOException, ExecutionException,
InterruptedException
{
// We have two goals here:
// - truncate should delete everything written before truncate was
invoked
@@ -1984,11 +1984,15 @@ public class ColumnFamilyStore implement
//
// Bonus bonus: simply forceFlush of all the CF is not enough, because
if
// for a given column family the memtable is clean, forceFlush will
return
- // immediately, even though there could be a memtable being flush at
the same
- // time. So to guarantee that all segments can be cleaned out, we need
+ // immediately, even though there could be a memtable being flushed at
the same
+ // time. So to guarantee that all segments can be cleaned out, we
need to
// "waitForActiveFlushes" after the new segment has been created.
+ logger.debug("truncating {}", columnFamily);
+ // flush the CF being truncated before forcing the new segment
+ forceBlockingFlush();
CommitLog.instance.forceNewSegment();
ReplayPosition position = CommitLog.instance.getContext();
+ // now flush everyone else. re-flushing ourselves is not necessary,
but harmless
for (ColumnFamilyStore cfs : ColumnFamilyStore.all())
cfs.forceFlush();
waitForActiveFlushes();
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Truncation.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Truncation.java?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Truncation.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/Truncation.java
Wed Oct 5 18:18:23 2011
@@ -55,15 +55,6 @@ public class Truncation implements Messa
this.columnFamily = columnFamily;
}
- /**
- * This is equivalent to calling commit. Applies the changes to
- * to the table that is obtained by calling Table.open().
- */
- public void apply() throws IOException
- {
- Table.open(keyspace).getColumnFamilyStore(columnFamily).truncate();
- }
-
public Message getMessage(Integer version) throws IOException
{
ByteArrayOutputStream bos = new ByteArrayOutputStream();
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLog.java
Wed Oct 5 18:18:23 2011
@@ -120,6 +120,8 @@ public class CommitLog
public void resetUnsafe()
{
+ for (CommitLogSegment segment : segments)
+ segment.close();
segments.clear();
segments.add(new CommitLogSegment());
}
@@ -474,7 +476,6 @@ public class CommitLog
if (segment.isSafeToDelete() && iter.hasNext())
{
logger.info("Discarding obsolete commit log:" + segment);
- segment.close();
DeletionService.executeDelete(segment.getPath());
// usually this will be the first (remaining) segment, but not
always, if segment A contains
// writes to a CF that is unflushed but is followed by segment B
whose CFs are all flushed.
@@ -492,34 +493,26 @@ public class CommitLog
currentSegment().sync();
}
- public void forceNewSegment()
+ public void forceNewSegment() throws ExecutionException,
InterruptedException
{
Callable<?> task = new Callable()
{
public Object call() throws IOException
{
- createNewSegment();
+ if (currentSegment().length() > 0)
+ createNewSegment();
return null;
}
};
- try
- {
- executor.submit(task).get();
- }
- catch (InterruptedException e)
- {
- throw new AssertionError(e);
- }
- catch (ExecutionException e)
- {
- throw new RuntimeException(e);
- }
+ executor.submit(task).get();
}
private void createNewSegment() throws IOException
{
+ assert !segments.isEmpty();
sync();
+ segments.getLast().close();
segments.add(new CommitLogSegment());
}
Modified:
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
(original)
+++
cassandra/branches/cassandra-0.8/src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
Wed Oct 5 18:18:23 2011
@@ -47,6 +47,7 @@ public class CommitLogSegment
public final long id;
private final BufferedRandomAccessFile logWriter;
+ private long finalSize = -1;
// cache which cf is dirty in this segment to avoid having to lookup all
ReplayPositions to decide if we could delete this segment
private Map<Integer, Integer> cfLastWrite = new HashMap<Integer,
Integer>();
@@ -162,6 +163,9 @@ public class CommitLogSegment
public long length()
{
+ if (finalSize >= 0)
+ return finalSize;
+
try
{
return logWriter.length();
@@ -174,8 +178,12 @@ public class CommitLogSegment
public void close()
{
+ if (finalSize >= 0)
+ return;
+
try
{
+ finalSize = logWriter.length();
logWriter.close();
}
catch (IOException e)
Modified:
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java?rev=1179359&r1=1179358&r2=1179359&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
(original)
+++
cassandra/branches/cassandra-0.8/test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
Wed Oct 5 18:18:23 2011
@@ -48,7 +48,7 @@ public class RecoveryManagerTruncateTest
RowMutation rm;
ColumnFamily cf;
- // trucate clears memtable
+ // add a single cell
rm = new RowMutation("Keyspace1",
ByteBufferUtil.bytes("keymulti"));
cf = ColumnFamily.create("Keyspace1", "Standard1");
cf.addColumn(column("col1", "val1", 1L));
@@ -60,22 +60,11 @@ public class RecoveryManagerTruncateTest
// and now truncate it
cfs.truncate().get();
+ CommitLog.instance.resetUnsafe();
CommitLog.recover();
// and validate truncation.
assertNull(getFromTable(table, "Standard1", "keymulti",
"col1"));
-
- // truncate clears sstable
- rm = new RowMutation("Keyspace1",
ByteBufferUtil.bytes("keymulti"));
- cf = ColumnFamily.create("Keyspace1", "Standard1");
- cf.addColumn(column("col1", "val1", 1L));
- rm.add(cf);
- rm.apply();
- cfs.forceBlockingFlush();
- cfs.truncate().get();
- CommitLog.instance.resetUnsafe();
- CommitLog.recover();
- assertNull(getFromTable(table, "Standard1", "keymulti",
"col1"));
}
private IColumn getFromTable(Table table, String cfName, String
keyName, String columnName)