Author: jbellis
Date: Fri Mar 11 17:54:08 2011
New Revision: 1080703

URL: http://svn.apache.org/viewvc?rev=1080703&view=rev
Log:
AES Counter Repair Improvements
patch by Alan Liang; reviewed by slebresne for CASSANDRA-2288

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
    
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
    
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Fri Mar 11 17:54:08 2011
@@ -1,7 +1,7 @@
 0.8-dev
  * avoid double RowMutation serialization on write path (CASSANDRA-1800)
  * adds support for columns that act as incr/decr counters 
-   (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093)
+   (CASSANDRA-1072, 1937, 1944, 1936, 2101, 2093, 2288)
  * make NetworkTopologyStrategy the default (CASSANDRA-1960)
  * configurable internode encryption (CASSANDRA-1567)
  * human readable column names in sstable2json output (CASSANDRA-1933)

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Fri Mar 11 17:54:08 2011
@@ -369,8 +369,7 @@ public class SSTableWriter extends SSTab
             {
                 try
                 {
-                    dfile.close();
-                    iwriter.close();
+                    close();
                 }
                 catch (IOException e)
                 {
@@ -379,6 +378,12 @@ public class SSTableWriter extends SSTab
             }
         }
 
+        void close() throws IOException
+        {
+            dfile.close();
+            iwriter.close();
+        }
+
         protected long doIndexing() throws IOException
         {
             EstimatedHistogram rowSizes = SSTable.defaultRowHistogram();
@@ -424,9 +429,12 @@ public class SSTableWriter extends SSTab
      */
     static class CommutativeRowIndexer extends RowIndexer
     {
+        protected BufferedRandomAccessFile writerDfile;
+
         CommutativeRowIndexer(Descriptor desc, CFMetaData metadata) throws 
IOException
         {
-            super(desc, new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true), 
metadata);
+            super(desc, new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "r", 8 * 1024 * 1024, true), 
metadata);
+            writerDfile = new BufferedRandomAccessFile(new 
File(desc.filenameFor(SSTable.COMPONENT_DATA)), "rw", 8 * 1024 * 1024, true);
         }
 
         @Override
@@ -440,10 +448,14 @@ public class SSTableWriter extends SSTab
 
             long readRowPosition  = 0L;
             long writeRowPosition = 0L;
-            while (readRowPosition < dfile.length())
+
+            writerDfile.seek(writeRowPosition);
+            dfile.seek(readRowPosition);
+
+            long dfileLength = dfile.length();
+            while (readRowPosition < dfileLength)
             {
                 // read key
-                dfile.seek(readRowPosition);
                 diskKey = ByteBufferUtil.readWithShortLength(dfile);
 
                 // skip data size, bloom filter, column index
@@ -466,34 +478,40 @@ public class SSTableWriter extends SSTab
                 iwriter.afterAppend(key, writeRowPosition);
 
                 // write key
-                dfile.seek(writeRowPosition);
-                ByteBufferUtil.writeWithShortLength(diskKey, dfile);
+                ByteBufferUtil.writeWithShortLength(diskKey, writerDfile);
 
                 // write data size; serialize CF w/ bloom filter, column index
-                long writeSizePosition = dfile.getFilePointer();
-                dfile.writeLong(-1L);
-                ColumnFamily.serializer().serializeWithIndexes(cf, dfile);
-                long writeEndPosition = dfile.getFilePointer();
-                dfile.seek(writeSizePosition);
-                dfile.writeLong(writeEndPosition - (writeSizePosition + 8L));
+                long writeSizePosition = writerDfile.getFilePointer();
+                writerDfile.writeLong(-1L);
+                ColumnFamily.serializer().serializeWithIndexes(cf, 
writerDfile);
+                long writeEndPosition = writerDfile.getFilePointer();
+                writerDfile.seek(writeSizePosition);
+                writerDfile.writeLong(writeEndPosition - (writeSizePosition + 
8L));
 
                 writeRowPosition = writeEndPosition;
+                writerDfile.seek(writeRowPosition);
 
                 rows++;
-
-                dfile.sync();
             }
             writeStatistics(desc, rowSizes, columnCounts);
 
             if (writeRowPosition != readRowPosition)
             {
                 // truncate file to new, reduced length
-                dfile.setLength(writeRowPosition);
-                dfile.sync();
+                writerDfile.setLength(writeRowPosition);
             }
+            writerDfile.sync();
 
             return rows;
         }
+
+
+        @Override
+        void close() throws IOException
+        {
+            super.close();
+            writerDfile.close();
+        }
     }
 
     /**

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/util/BufferedRandomAccessFile.java
 Fri Mar 11 17:54:08 2011
@@ -185,6 +185,41 @@ public class BufferedRandomAccessFile ex
         }
     }
 
+    @Override
+    public void setLength(long newLength) throws IOException
+    {
+        if (newLength < 0)
+            throw new IllegalArgumentException();
+
+        // account for dirty data in buffers
+        if (isDirty)
+        {
+            if (newLength < bufferOffset)
+            {
+                // buffer is garbage
+                validBufferBytes = 0;
+            }
+            else if (newLength > (bufferOffset + validBufferBytes))
+            {
+                // flush everything in buffer
+                flush();
+            }
+            else // buffer within range
+            {
+                // truncate buffer and flush
+                validBufferBytes = (int)(newLength - bufferOffset);
+                flush();
+            }
+        }
+
+        // at this point all dirty buffer data is flushed
+        super.setLength(newLength);
+
+        validBufferBytes = 0;
+        current = newLength;
+        reBuffer();
+    }
+
     private void reBuffer() throws IOException
     {
         flush(); // synchronizing buffer and file on disk

Modified: 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java 
(original)
+++ 
cassandra/trunk/test/long/org/apache/cassandra/db/LongCompactionSpeedTest.java 
Fri Mar 11 17:54:08 2011
@@ -18,18 +18,24 @@
 */
 package org.apache.cassandra.db;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.*;
 
 import org.apache.cassandra.Util;
 
 import org.junit.Test;
-
+import org.apache.cassandra.io.util.FileUtils;
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.io.sstable.SSTableUtils;
+import org.apache.cassandra.io.sstable.SSTableWriter;
 import org.apache.cassandra.CleanupHelper;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.streaming.OperationType;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import java.nio.ByteBuffer;
+import org.apache.cassandra.io.sstable.Component;
 import static junit.framework.Assert.assertEquals;
 
 public class LongCompactionSpeedTest extends CleanupHelper
@@ -64,6 +70,33 @@ public class LongCompactionSpeedTest ext
         testCompaction(100, 800, 5);
     }
 
+    /**
+     * Test aes counter repair with a very wide row.
+     */
+    @Test
+    public void testAESCountersRepairWide() throws Exception
+    {
+        testAESCountersRepair(2, 1, 500000);
+    }
+
+    /**
+     * Test aes counter repair with lots of skinny rows.
+     */
+    @Test
+    public void testAESCountersRepairSlim() throws Exception
+    {
+        testAESCountersRepair(2, 500000, 1);
+    }
+
+    /**
+     * Test aes counter repair with lots of small sstables.
+     */
+    @Test
+    public void testAESCounterRepairMany() throws Exception
+    {
+        testAESCountersRepair(100, 1000, 5);
+    }
+
     protected void testCompaction(int sstableCount, int rowsPerSSTable, int 
colsPerRow) throws Exception
     {
         CompactionManager.instance.disableAutoCompaction();
@@ -103,4 +136,64 @@ public class LongCompactionSpeedTest ext
                                          colsPerRow,
                                          System.currentTimeMillis() - start));
     }
+
+    protected void testAESCountersRepair(int sstableCount, final int 
rowsPerSSTable, final int colsPerRow) throws Exception
+    {
+        final String cfName = "Counter1";
+        CompactionManager.instance.disableAutoCompaction();
+
+        ArrayList<SSTableReader> sstables = new ArrayList<SSTableReader>();
+        for (int k = 0; k < sstableCount; k++)
+        {
+            final int sstableNum = k;
+            SSTableReader sstable = 
SSTableUtils.prepare().ks(TABLE1).cf(cfName).write(rowsPerSSTable, new 
SSTableUtils.Appender(){
+                int written = 0;
+                public boolean append(SSTableWriter writer) throws IOException
+                {
+                    if (written > rowsPerSSTable)
+                        return false;
+
+                    DecoratedKey key = Util.dk(String.format("%020d", 
written));
+                    ColumnFamily cf = ColumnFamily.create(TABLE1, cfName);
+                    for (int i = 0; i < colsPerRow; i++)
+                        cf.addColumn(createCounterColumn(String.valueOf(i)));
+                    writer.append(key, cf);
+                    written++;
+                    return true;
+                }
+            });
+
+            // whack the index to trigger the recover
+            
FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX));
+            
FileUtils.deleteWithConfirm(sstable.descriptor.filenameFor(Component.FILTER));
+
+            sstables.add(sstable);
+        }
+
+        // give garbage collection a bit of time to catch up
+        Thread.sleep(1000);
+
+        long start = System.currentTimeMillis();
+
+        for (SSTableReader sstable : sstables)
+            CompactionManager.instance.submitSSTableBuild(sstable.descriptor, 
OperationType.AES).get();
+
+        System.out.println(String.format("%s: sstables=%d rowsper=%d 
colsper=%d: %d ms",
+                                         this.getClass().getName(),
+                                         sstableCount,
+                                         rowsPerSSTable,
+                                         colsPerRow,
+                                         System.currentTimeMillis() - start));
+    }
+
+    protected CounterColumn createCounterColumn(String name)
+    {
+        byte[] context = Util.concatByteArrays(
+            FBUtilities.getLocalAddress().getAddress(), 
FBUtilities.toByteArray(9L), FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(2),  FBUtilities.toByteArray(4L), 
FBUtilities.toByteArray(2L),
+            FBUtilities.toByteArray(4),  FBUtilities.toByteArray(3L), 
FBUtilities.toByteArray(3L),
+            FBUtilities.toByteArray(8),  FBUtilities.toByteArray(2L), 
FBUtilities.toByteArray(4L)
+        );
+        return new CounterColumn(ByteBufferUtil.bytes(name), 
ByteBuffer.wrap(context), 0L);
+    }
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java 
Fri Mar 11 17:54:08 2011
@@ -30,6 +30,8 @@ import org.apache.cassandra.db.Decorated
 import org.apache.cassandra.db.IColumn;
 import org.apache.cassandra.io.util.DataOutputBuffer;
 
+import org.apache.cassandra.Util;
+
 public class SSTableUtils
 {
     // first configured table and cf
@@ -130,15 +132,23 @@ public class SSTableUtils
 
         public SSTableReader write(Map<String, ColumnFamily> entries) throws 
IOException
         {
-            Map<ByteBuffer, ByteBuffer> map = new HashMap<ByteBuffer, 
ByteBuffer>();
+            SortedMap<DecoratedKey, ColumnFamily> sorted = new 
TreeMap<DecoratedKey, ColumnFamily>();
             for (Map.Entry<String, ColumnFamily> entry : entries.entrySet())
+                sorted.put(Util.dk(entry.getKey()), entry.getValue());
+
+            final Iterator<Map.Entry<DecoratedKey, ColumnFamily>> iter = 
sorted.entrySet().iterator();
+            return write(sorted.size(), new Appender()
             {
-                DataOutputBuffer buffer = new DataOutputBuffer();
-                
ColumnFamily.serializer().serializeWithIndexes(entry.getValue(), buffer);
-                map.put(ByteBuffer.wrap(entry.getKey().getBytes()),
-                        ByteBuffer.wrap(buffer.asByteArray()));
-            }
-            return writeRaw(map);
+                @Override
+                public boolean append(SSTableWriter writer) throws IOException
+                {
+                    if (!iter.hasNext())
+                        return false;
+                    Map.Entry<DecoratedKey, ColumnFamily> entry = iter.next();
+                    writer.append(entry.getKey(), entry.getValue());
+                    return true;
+                }
+            });
         }
 
         /**
@@ -148,16 +158,42 @@ public class SSTableUtils
         {
             File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, 
generation) : new File(dest.filenameFor(Component.DATA));
             SSTableWriter writer = new 
SSTableWriter(datafile.getAbsolutePath(), entries.size());
-            SortedMap<DecoratedKey, ByteBuffer> sortedEntries = new 
TreeMap<DecoratedKey, ByteBuffer>();
+            SortedMap<DecoratedKey, ByteBuffer> sorted = new 
TreeMap<DecoratedKey, ByteBuffer>();
             for (Map.Entry<ByteBuffer, ByteBuffer> entry : entries.entrySet())
-                
sortedEntries.put(writer.partitioner.decorateKey(entry.getKey()), 
entry.getValue());
-            for (Map.Entry<DecoratedKey, ByteBuffer> entry : 
sortedEntries.entrySet())
-                writer.append(entry.getKey(), entry.getValue());
+                sorted.put(writer.partitioner.decorateKey(entry.getKey()), 
entry.getValue());
+            final Iterator<Map.Entry<DecoratedKey, ByteBuffer>> iter = 
sorted.entrySet().iterator();
+            return write(sorted.size(), new Appender()
+            {
+                @Override
+                public boolean append(SSTableWriter writer) throws IOException
+                {
+                    if (!iter.hasNext())
+                        return false;
+                    Map.Entry<DecoratedKey, ByteBuffer> entry = iter.next();
+                    writer.append(entry.getKey(), entry.getValue());
+                    return true;
+                }
+            });
+        }
+
+        public SSTableReader write(int expectedSize, Appender appender) throws 
IOException
+        {
+            File datafile = (dest == null) ? tempSSTableFile(ksname, cfname, 
generation) : new File(dest.filenameFor(Component.DATA));
+            SSTableWriter writer = new 
SSTableWriter(datafile.getAbsolutePath(), expectedSize);
+            long start = System.currentTimeMillis();
+            while (appender.append(writer)) { /* pass */ }
             SSTableReader reader = writer.closeAndOpenReader();
+            // mark all components for removal
             if (cleanup)
-                for (Component comp : reader.components)
-                    new 
File(reader.descriptor.filenameFor(comp)).deleteOnExit();
+                for (Component component : reader.components)
+                    new 
File(reader.descriptor.filenameFor(component)).deleteOnExit();
             return reader;
         }
     }
+
+    public static abstract class Appender
+    {
+        /** Called with an open writer until it returns false. */
+        public abstract boolean append(SSTableWriter writer) throws 
IOException;
+    }
 }

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java?rev=1080703&r1=1080702&r2=1080703&view=diff
==============================================================================
--- 
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
 (original)
+++ 
cassandra/trunk/test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
 Fri Mar 11 17:54:08 2011
@@ -662,4 +662,56 @@ public class BufferedRandomAccessFileTes
         fout.close();
         return f;
     }
+
+    public void assertSetLength(BufferedRandomAccessFile file, long length) 
throws IOException
+    {
+        assert file.getFilePointer() == length;
+        assert file.length() == file.getFilePointer();
+        assert file.getChannel().size() == file.length();
+    }
+
+    @Test
+    public void testSetLength() throws IOException
+    {
+        File tmpFile = File.createTempFile("set_length", "bin");
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile, 
"rw", 8*1024*1024);
+
+        // test that data in buffer is truncated
+        file.writeLong(1L);
+        file.writeLong(2L);
+        file.writeLong(3L);
+        file.writeLong(4L);
+        file.setLength(16L);
+        assertSetLength(file, 16L);
+
+        // seek back and truncate within file
+        file.writeLong(3L);
+        file.seek(8L);
+        file.setLength(24L);
+        assertSetLength(file, 24L);
+
+        // seek back and truncate past end of file
+        file.setLength(64L);
+        assertSetLength(file, 64L);
+
+        // make sure file is consistent after sync
+        file.sync();
+        assertSetLength(file, 64L);
+    }
+
+    @Test (expected=IllegalArgumentException.class)
+    public void testSetNegativeLength() throws IOException, 
IllegalArgumentException
+    {
+        File tmpFile = File.createTempFile("set_negative_length", "bin");
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile, 
"rw", 8*1024*1024);
+        file.setLength(-8L);
+    }
+
+    @Test (expected=IOException.class)
+    public void testSetLengthDuringReadMode() throws IOException
+    {
+        File tmpFile = File.createTempFile("set_length_during_read_mode", 
"bin");
+        BufferedRandomAccessFile file = new BufferedRandomAccessFile(tmpFile, 
"r", 8*1024*1024);
+        file.setLength(4L);
+    }
 }


Reply via email to