Author: brandonwilliams Date: Thu Dec 30 18:20:37 2010 New Revision: 1053960
URL: http://svn.apache.org/viewvc?rev=1053960&view=rev Log: Fix CompactionManager regression from CASSANDRA-1916 and add a better test and more docs to prevent in the future. Patch by jbellis, reviewed by brandonwilliams for CASSANDRA-1922 Modified: cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/io/AbstractCompactedRow.java cassandra/branches/cassandra-0.7.0/test/unit/org/apache/cassandra/db/CleanupTest.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/AbstractCompactedRow.java cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java Modified: cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/db/CompactionManager.java Thu Dec 30 18:20:37 2010 @@ -798,6 +798,7 @@ public class CompactionManager implement public void write(DataOutput out) throws IOException { + out.writeLong(row.dataSize); row.echoData(out); } Modified: cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/io/AbstractCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/io/AbstractCompactedRow.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/io/AbstractCompactedRow.java (original) +++ cassandra/branches/cassandra-0.7.0/src/java/org/apache/cassandra/io/AbstractCompactedRow.java Thu Dec 30 18:20:37 2010 @@ -41,11 +41,23 @@ public abstract class AbstractCompactedR this.key = key; } + /** + * write the row (size + column index + filter + column data, but NOT row key) to @param out + */ public abstract void write(DataOutput out) throws IOException; - + + /** + * update @param digest with the data bytes of the row (not including row key or row size) + */ public abstract void update(MessageDigest digest); + /** + * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved + */ public abstract boolean isEmpty(); + /** + * @return the number of columns in the row + */ public abstract int columnCount(); } Modified: cassandra/branches/cassandra-0.7.0/test/unit/org/apache/cassandra/db/CleanupTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7.0/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7.0/test/unit/org/apache/cassandra/db/CleanupTest.java (original) +++ cassandra/branches/cassandra-0.7.0/test/unit/org/apache/cassandra/db/CleanupTest.java Thu Dec 30 18:20:37 2010 @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter; @@ -49,9 +50,10 @@ import org.apache.cassandra.utils.FBUtil public class CleanupTest extends CleanupHelper { - public static final int LOOPS = 800; + public static final int LOOPS = 200; public static final String TABLE1 = "Keyspace1"; public static final String CF1 = "Indexed1"; + public static final String CF2 = "Standard1"; public static final ByteBuffer COLUMN = ByteBuffer.wrap("birthdate".getBytes()); public static final ByteBuffer VALUE = ByteBuffer.allocate(8); static @@ -61,34 +63,59 @@ public class CleanupTest extends Cleanup } @Test - public void testCleanup() throws IOException, ExecutionException, InterruptedException + public void testCleanup() throws IOException, ExecutionException, InterruptedException, ConfigurationException { + StorageService.instance.initServer(); + Table table = Table.open(TABLE1); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1); + List<Row> rows; + + // insert data and verify we get it back w/ range query fillCF(cfs, LOOPS); + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + + // with one token in the ring, owned by the local node, cleanup should be a no-op + CompactionManager.instance.performCleanup(cfs); + + // check data is still there + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + } + @Test + public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException + { + Table table = Table.open(TABLE1); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1); assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN); - ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN); + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, LOOPS); + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN); assertTrue(cfi.isIndexBuilt()); + // verify we get it back w/ index query too IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE); IndexClause clause = new IndexClause(Arrays.asList(expr), FBUtilities.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE); IFilter filter = new IdentityQueryFilter(); IPartitioner p = StorageService.getPartitioner(); Range range = new Range(p.getMinimumToken(), p.getMinimumToken()); - List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter); - + rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter); assertEquals(LOOPS, rows.size()); + // nuke our token so cleanup will remove everything TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + assert StorageService.instance.getLocalRanges(TABLE1).isEmpty(); - assertNotNull(tmd); - assertEquals(0, tmd.getTokenToEndpointMap().size()); - - // Since this test has no ring cleanup will remove all CompactionManager.instance.performCleanup(cfs); // row data should be gone @@ -98,27 +125,25 @@ public class CleanupTest extends Cleanup // not only should it be gone but there should be no data on disk, not even tombstones assert cfs.getSSTables().isEmpty(); - // 2ary indexes should result in no results, but + // 2ary indexes should result in no results, too (although tombstones won't be gone until compacted) rows = cfs.scan(clause, range, filter); assertEquals(0, rows.size()); } - protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException + protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException { CompactionManager.instance.disableAutoCompaction(); for (int i = 0; i < rowsPerSSTable; i++) { String key = String.valueOf(i); - // create a row and update the birthdate value, test that the index query fetches the new version RowMutation rm; rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key)); - rm.add(new QueryPath(CF1, null, COLUMN), VALUE, System.currentTimeMillis()); - rm.apply(); + rm.add(new QueryPath(cfs.getColumnFamilyName(), null, COLUMN), VALUE, System.currentTimeMillis()); + rm.applyUnsafe(); } - store.forceBlockingFlush(); - store.buildSecondaryIndexes(store.getSSTables(), store.getIndexedColumns()); + cfs.forceBlockingFlush(); } } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/db/CompactionManager.java Thu Dec 30 18:20:37 2010 @@ -798,6 +798,7 @@ public class CompactionManager implement public void write(DataOutput out) throws IOException { + out.writeLong(row.dataSize); row.echoData(out); } Modified: cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/AbstractCompactedRow.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/AbstractCompactedRow.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/AbstractCompactedRow.java (original) +++ cassandra/branches/cassandra-0.7/src/java/org/apache/cassandra/io/AbstractCompactedRow.java Thu Dec 30 18:20:37 2010 @@ -41,11 +41,23 @@ public abstract class AbstractCompactedR this.key = key; } + /** + * write the row (size + column index + filter + column data, but NOT row key) to @param out + */ public abstract void write(DataOutput out) throws IOException; - + + /** + * update @param digest with the data bytes of the row (not including row key or row size) + */ public abstract void update(MessageDigest digest); + /** + * @return true if there are no columns in the row AND there are no row-level tombstones to be preserved + */ public abstract boolean isEmpty(); + /** + * @return the number of columns in the row + */ public abstract int columnCount(); } Modified: cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java URL: http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java?rev=1053960&r1=1053959&r2=1053960&view=diff ============================================================================== --- cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java (original) +++ cassandra/branches/cassandra-0.7/test/unit/org/apache/cassandra/db/CleanupTest.java Thu Dec 30 18:20:37 2010 @@ -33,6 +33,7 @@ import org.junit.Test; import org.apache.cassandra.CleanupHelper; import org.apache.cassandra.Util; +import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.IFilter; import org.apache.cassandra.db.filter.QueryFilter; @@ -49,9 +50,10 @@ import org.apache.cassandra.utils.FBUtil public class CleanupTest extends CleanupHelper { - public static final int LOOPS = 800; + public static final int LOOPS = 200; public static final String TABLE1 = "Keyspace1"; public static final String CF1 = "Indexed1"; + public static final String CF2 = "Standard1"; public static final ByteBuffer COLUMN = ByteBuffer.wrap("birthdate".getBytes()); public static final ByteBuffer VALUE = ByteBuffer.allocate(8); static @@ -61,34 +63,59 @@ public class CleanupTest extends Cleanup } @Test - public void testCleanup() throws IOException, ExecutionException, InterruptedException + public void testCleanup() throws IOException, ExecutionException, InterruptedException, ConfigurationException { + StorageService.instance.initServer(); + Table table = Table.open(TABLE1); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF2); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1); + List<Row> rows; + + // insert data and verify we get it back w/ range query fillCF(cfs, LOOPS); + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + + // with one token in the ring, owned by the local node, cleanup should be a no-op + CompactionManager.instance.performCleanup(cfs); + + // check data is still there + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + } + @Test + public void testCleanupWithIndexes() throws IOException, ExecutionException, InterruptedException + { + Table table = Table.open(TABLE1); + ColumnFamilyStore cfs = table.getColumnFamilyStore(CF1); assertEquals(cfs.getIndexedColumns().iterator().next(), COLUMN); - ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN); + List<Row> rows; + + // insert data and verify we get it back w/ range query + fillCF(cfs, LOOPS); + rows = cfs.getRangeSlice(null, Util.range("", ""), 1000, new IdentityQueryFilter()); + assertEquals(LOOPS, rows.size()); + ColumnFamilyStore cfi = cfs.getIndexedColumnFamilyStore(COLUMN); assertTrue(cfi.isIndexBuilt()); + // verify we get it back w/ index query too IndexExpression expr = new IndexExpression(COLUMN, IndexOperator.EQ, VALUE); IndexClause clause = new IndexClause(Arrays.asList(expr), FBUtilities.EMPTY_BYTE_BUFFER, Integer.MAX_VALUE); IFilter filter = new IdentityQueryFilter(); IPartitioner p = StorageService.getPartitioner(); Range range = new Range(p.getMinimumToken(), p.getMinimumToken()); - List<Row> rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter); - + rows = table.getColumnFamilyStore(CF1).scan(clause, range, filter); assertEquals(LOOPS, rows.size()); + // nuke our token so cleanup will remove everything TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + assert StorageService.instance.getLocalRanges(TABLE1).isEmpty(); - assertNotNull(tmd); - assertEquals(0, tmd.getTokenToEndpointMap().size()); - - // Since this test has no ring cleanup will remove all CompactionManager.instance.performCleanup(cfs); // row data should be gone @@ -98,27 +125,25 @@ public class CleanupTest extends Cleanup // not only should it be gone but there should be no data on disk, not even tombstones assert cfs.getSSTables().isEmpty(); - // 2ary indexes should result in no results, but + // 2ary indexes should result in no results, too (although tombstones won't be gone until compacted) rows = cfs.scan(clause, range, filter); assertEquals(0, rows.size()); } - protected void fillCF(ColumnFamilyStore store, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException + protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException { CompactionManager.instance.disableAutoCompaction(); for (int i = 0; i < rowsPerSSTable; i++) { String key = String.valueOf(i); - // create a row and update the birthdate value, test that the index query fetches the new version RowMutation rm; rm = new RowMutation(TABLE1, ByteBufferUtil.bytes(key)); - rm.add(new QueryPath(CF1, null, COLUMN), VALUE, System.currentTimeMillis()); - rm.apply(); + rm.add(new QueryPath(cfs.getColumnFamilyName(), null, COLUMN), VALUE, System.currentTimeMillis()); + rm.applyUnsafe(); } - store.forceBlockingFlush(); - store.buildSecondaryIndexes(store.getSSTables(), store.getIndexedColumns()); + cfs.forceBlockingFlush(); } }
