Author: jbellis
Date: Fri Mar 11 17:54:08 2011
New Revision: 1080703
URL: http://svn.apache.org/viewvc?rev=1080703&view=rev
Log:
AES Counter Repair Improvements
patch by Alan Liang; reviewed by slebresne for CASSANDRA-2288
Modified:
cassandra/trunk/CHANGES.txt
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Modified: cassandra/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Mar 11 17:54:08 2011
@@ -1,7 +1,7 @@
0.8-dev
* avoid double RowMutation serialization on write path (CASSANDRA-1800)
* adds support for columns that act as incr/decr counters
- (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093)
+ (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288)
* make NetworkTopologyStrategy the default (CASSANDRA-1960)
* configurable internode encryption (CASSANDRA-1567)
* human readable column names in sstable2json output (CASSANDRA-1933)
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
Fri Mar 11 17:54:08 2011
@@ -369,8 +369,7 @@ public class SSTableWriter extends SSTab
{
try
{
- dfile.close();
- iwriter.close();
+ close();
}
catch (IOException e)
{
@@ -379,6 +378,12 @@ public class SSTableWriter extends SSTab
}
}
+ void close() throws IOException
+ {
+ dfile.close();
+ iwriter.close();
+ }
+
protected long doIndexing() throws IOException
{
EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
@@ -424,9 +429,12 @@ public class SSTableWriter extends SSTab
*/
static class CommutativeRowIndexer extends RowIndexer
{
+ protected BufferedRandomAccessFile writerDfile;
+
CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws
IOException
{
- super(desc, new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true),
metadata);
+ super(desc, new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true),
metadata);
+ writerDfile = new BufferedRandomAccessFile(new
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
}
@Override
@@ -440,10 +448,14 @@ public class SSTableWriter extends SSTab
long readRowPosition = 0L;
long writeRowPosition = 0L;
- while (readRowPosition < dfile.length())
+
+ writerDfile.seek(writeRowPosition);
+ dfile.seek(readRowPosition);
+
+ long dfileLength = dfile.length();
+ while (readRowPosition < dfileLength)
{
// read key
- dfile.seek(readRowPosition);
diskKey = ByteBufferUtil.readWithShortLength(dfile);
// skip data size, bloom filter, column index
@@ -466,34 +478,40 @@ public class SSTableWriter extends SSTab
iwriter.afterAppend(key, writeRowPosition);
// write key
- dfile.seek(writeRowPosition);
- ByteBufferUtil.writeWithShortLength(diskKey, dfile);
+ ByteBufferUtil.writeWithShortLength(diskKey, writerDfile);
// write data size; serialize CF w/ bloom filter, column index
- long writeSizePosition = dfile.getFilePointer();
- dfile.writeLong(-1L);
- ColumnFamily.serializer().serializeWithIndexes(cf, dfile);
- long writeEndPosition = dfile.getFilePointer();
- dfile.seek(writeSizePosition);
- dfile.writeLong(writeEndPosition - (writeSizePosition + 8L));
+ long writeSizePosition = writerDfile.getFilePointer();
+ writerDfile.writeLong(-1L);
+ ColumnFamily.serializer().serializeWithIndexes(cf,
writerDfile);
+ long writeEndPosition = writerDfile.getFilePointer();
+ writerDfile.seek(writeSizePosition);
+ writerDfile.writeLong(writeEndPosition - (writeSizePosition +
8L));
writeRowPosition = writeEndPosition;
+ writerDfile.seek(writeRowPosition);
rows++;
-
- dfile.sync();
}
writeStatistics(desc, rowSizes, columnCounts);
if (writeRowPosition != readRowPosition)
{
// truncate file to new, reduced length
- dfile.setLength(writeRowPosition);
- dfile.sync();
+ writerDfile.setLength(writeRowPosition);
}
+ writerDfile.sync();
return rows;
}
+
+
+ @Override
+ void close() throws IOException
+ {
+ super.close();
+ writerDfile.close();
+ }
}
/**
Modified:
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
Fri Mar 11 17:54:08 2011
@@ -185,6 +185,41 @@ public class BufferedRandomAccessFile ex
}
}
+ @Override
+ public void setLength(long newLength) throws IOException
+ {
+ if (newLength < 0)
+ throw new IllegalArgumentException();
+
+ // account for dirty data in buffers
+ if (isDirty)
+ {
+ if (newLength < bufferOffset)
+ {
+ // buffer is garbage
+ validBufferBytes = 0;
+ }
+ else if (newLength > (bufferOffset + validBufferBytes))
+ {
+ // flush everything in buffer
+ flush();
+ }
+ else // buffer within range
+ {
+ // truncate buffer and flush
+ validBufferBytes = (int)(newLength - bufferOffset);
+ flush();
+ }
+ }
+
+ // at this point all dirty buffer data is flushed
+ super.setLength(newLength);
+
+ validBufferBytes = 0;
+ current = newLength;
+ reBuffer();
+ }
+
private void reBuffer() throws IOException
{
flush(); // synchronizing buffer and file on disk
Modified:
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
---
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
(original)
+++
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
Fri Mar 11 17:54:08 2011
@@ -18,18 +18,24 @@
*/
package org.apache.cassandra.db;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.*;
import org.apache.cassandra.Util;
import org.junit.Test;
-
+import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.CleanupHelper;
import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import java.nio.ByteBuffer;
+import org.apache.cassandra.io.sstable.Component;
import static junit.framework.Assert.assertEquals;
public class LongCompactionSpeedTest extends CleanupHelper
@@ -64,6 +70,33 @@ public class LongCompactionSpeedTest ext
testCompaction(100, 800, 5);
}
+ /**
+ * Test aes counter repair with a very wide row.
+ */
+ @Test
+ public void testAESCountersRepairWide() throws Exception
+ {
+ testAESCountersRepair(2, 1, 500000);
+ }
+
+ /**
+ * Test aes counter repair with lots of skinny rows.
+ */
+ @Test
+ public void testAESCountersRepairSlim() throws Exception
+ {
+ testAESCountersRepair(2, 500000, 1);
+ }
+
+ /**
+ * Test aes counter repair with lots of small sstables.
+ */
+ @Test
+ public void testAESCounterRepairMany() throws Exception
+ {
+ testAESCountersRepair(100, 1000, 5);
+ }
+
protected void testCompaction(int sstableCount, int rowsPerSSTable, int
colsPerRow) throws Exception
{
CompactionManager.instance.disableAutoCompaction();
@@ -103,4 +136,64 @@ public class LongCompactionSpeedTest ext
colsPerRow,
System.currentTimeMillis() - start));
}
+
+ protected void testAESCountersRepair(int sstableCount, final int
rowsPerSSTable, final int colsPerRow) throws Exception
+ {
+ final String cfName = "Counter1";
+ CompactionManager.instance.disableAutoCompaction();
+
+ ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
+ for (int k = 0; k < sstableCount; k++)
+ {
+ final int sstableNum = k;
+ SSTableReader sstable =
SSTableUtils.prepare().ks(TABLE1).cf(cfName).write(rowsPerSSTable, new
SSTableUtils.Appender(){
+ int written = 0;
+ public boolean append(SSTableWriter writer) throws IOException
+ {
+ if (written > rowsPerSSTable)
+ return false;
+
+ DecoratedKey key = Util.dk(String.format("%020d",
written));
+ ColumnFamily cf = ColumnFamily.create(TABLE1, cfName);
+ for (int i = 0; i < colsPerRow; i++)
+ cf.addColumn(createCounterColumn(String.valueOf(i)));
+ writer.append(key, cf);
+ written++;
+ return true;
+ }
+ });
+
+ // whack the index to trigger the recover
+
FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+
FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.FILTER));
+
+ sstables.add(sstable);
+ }
+
+ // give garbage collection a bit of time to catch up
+ Thread.sleep(1000);
+
+ long start = System.currentTimeMillis();
+
+ for (SSTableReader sstable : sstables)
+ CompactionManager.instance.submitSSTableBuild(sstable.descriptor,
OperationType.AES).get();
+
+ System.out.println(String.format("%s: sstables=%d rowsper=%d
colsper=%d: %d ms",
+ this.getClass().getName(),
+ sstableCount,
+ rowsPerSSTable,
+ colsPerRow,
+ System.currentTimeMillis() - start));
+ }
+
+ protected CounterColumn createCounterColumn(String name)
+ {
+ byte[] context = Util.concatByteArrays(
+ FBUtilities.getLocalAddress().getAddress(),
FBUtilities.toByteArray(9L), FBUtilities.toByteArray(3L),
+ FBUtilities.toByteArray(2), FBUtilities.toByteArray(4L),
FBUtilities.toByteArray(2L),
+ FBUtilities.toByteArray(4), FBUtilities.toByteArray(3L),
FBUtilities.toByteArray(3L),
+ FBUtilities.toByteArray(8), FBUtilities.toByteArray(2L),
FBUtilities.toByteArray(4L)
+ );
+ return new CounterColumn(ByteBufferUtil.bytes(name),
ByteBuffer.wrap(context), 0L);
+ }
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
Fri Mar 11 17:54:08 2011
@@ -30,6 +30,8 @@ import org.apache.cassandra.db.Decorated
import org.apache.cassandra.db.IColumn;
import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.Util;
+
public class SSTableUtils
{
// first configured table and cf
@@ -130,15 +132,23 @@ public class SSTableUtils
public SSTableReader write(Map<String, ColumnFamily> entries) throws
IOException
{
- Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer,
ByteBuffer>();
+ SortedMap<DecoratedKey, ColumnFamily> sorted = new
TreeMap<DecoratedKey, ColumnFamily>();
for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
+ sorted.put(Util.dk(entry.getKey()), entry.getValue());
+
+ final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter =
sorted.entrySet().iterator();
+ return write(sorted.size(), new Appender()
{
- DataOutputBuffer buffer = new DataOutputBuffer();
-
ColumnFamily.serializer().serializeWithIndexes(entry.getValue(), buffer);
- map.put(ByteBuffer.wrap(entry.getKey().getBytes()),
- ByteBuffer.wrap(buffer.asByteArray()));
- }
- return writeRaw(map);
+ @Override
+ public boolean append(SSTableWriter writer) throws IOException
+ {
+ if (!iter.hasNext())
+ return false;
+ Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
+ writer.append(entry.getKey(), entry.getValue());
+ return true;
+ }
+ });
}
/**
@@ -148,16 +158,42 @@ public class SSTableUtils
{
File datafile = (dest == null) ? tempSSTableFile(ksname, cfname,
generation) : new File(dest.filenameFor(Component.DATA));
SSTableWriter writer = new
SSTableWriter(datafile.getAbsolutePath(), entries.size());
- SortedMap<DecoratedKey, ByteBuffer> sortedEntries = new
TreeMap<DecoratedKey, ByteBuffer>();
+ SortedMap<DecoratedKey, ByteBuffer> sorted = new
TreeMap<DecoratedKey, ByteBuffer>();
for (Map.Entry<ByteBuffer, ByteBuffer> entry : entries.entrySet())
-
sortedEntries.put(writer.partitioner.decorateKey(entry.getKey()),
entry.getValue());
- for (Map.Entry<DecoratedKey, ByteBuffer> entry :
sortedEntries.entrySet())
- writer.append(entry.getKey(), entry.getValue());
+ sorted.put(writer.partitioner.decorateKey(entry.getKey()),
entry.getValue());
+ final Iterator<Map.Entry<DecoratedKey, ByteBuffer>> iter =
sorted.entrySet().iterator();
+ return write(sorted.size(), new Appender()
+ {
+ @Override
+ public boolean append(SSTableWriter writer) throws IOException
+ {
+ if (!iter.hasNext())
+ return false;
+ Map.Entry<DecoratedKey, ByteBuffer> entry = iter.next();
+ writer.append(entry.getKey(), entry.getValue());
+ return true;
+ }
+ });
+ }
+
+ public SSTableReader write(int expectedSize, Appender appender) throws
IOException
+ {
+ File datafile = (dest == null) ? tempSSTableFile(ksname, cfname,
generation) : new File(dest.filenameFor(Component.DATA));
+ SSTableWriter writer = new
SSTableWriter(datafile.getAbsolutePath(), expectedSize);
+ long start = System.currentTimeMillis();
+ while (appender.append(writer)) { /* pass */ }
SSTableReader reader = writer.closeAndOpenReader();
+ // mark all components for removal
if (cleanup)
- for (Component comp : reader.components)
- new
File(reader.descriptor.filenameFor(comp)).deleteOnExit();
+ for (Component component : reader.components)
+ new
File(reader.descriptor.filenameFor(component)).deleteOnExit();
return reader;
}
}
+
+ public static abstract class Appender
+ {
+ /** Called with an open writer until it returns false. */
+ public abstract boolean append(SSTableWriter writer) throws
IOException;
+ }
}
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
Fri Mar 11 17:54:08 2011
@@ -662,4 +662,56 @@ public class BufferedRandomAccessFileTes
fout.close();
return f;
}
+
+ public void assertSetLength(BufferedRandomAccessFile file, long length)
throws IOException
+ {
+ assert file.getFilePointer() == length;
+ assert file.length() == file.getFilePointer();
+ assert file.getChannel().size() == file.length();
+ }
+
+ @Test
+ public void testSetLength() throws IOException
+ {
+ File tmpFile = File.createTempFile("set_length", "bin");
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile,
"rw", 8*1024*1024);
+
+ // test that data in buffer is truncated
+ file.writeLong(1L);
+ file.writeLong(2L);
+ file.writeLong(3L);
+ file.writeLong(4L);
+ file.setLength(16L);
+ assertSetLength(file, 16L);
+
+ // seek back and truncate within file
+ file.writeLong(3L);
+ file.seek(8L);
+ file.setLength(24L);
+ assertSetLength(file, 24L);
+
+ // seek back and truncate past end of file
+ file.setLength(64L);
+ assertSetLength(file, 64L);
+
+ // make sure file is consistent after sync
+ file.sync();
+ assertSetLength(file, 64L);
+ }
+
+ @Test (expected=IllegalArgumentException.class)
+ public void testSetNegativeLength() throws IOException,
IllegalArgumentException
+ {
+ File tmpFile = File.createTempFile("set_negative_length", "bin");
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile,
"rw", 8*1024*1024);
+ file.setLength(-8L);
+ }
+
+ @Test (expected=IOException.class)
+ public void testSetLengthDuringReadMode() throws IOException
+ {
+ File tmpFile = File.createTempFile("set_length_during_read_mode",
"bin");
+ BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile,
"r", 8*1024*1024);
+ file.setLength(4L);
+ }
}