Updated Branches: refs/heads/trunk 3ca160ea7 -> a9bd531bc
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 8defb10..43ea5a8 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -40,6 +40,9 @@ public class SSTableWriter extends SSTable { private static final Logger logger = LoggerFactory.getLogger(SSTableWriter.class); + // not very random, but the only value that can't be mistaken for a legal column-name length + public static final int END_OF_ROW = 0x0000; + private IndexWriter iwriter; private SegmentedFile.Builder dbuilder; private final SequentialWriter dataFile; @@ -135,7 +138,7 @@ public class SSTableWriter extends SSTable return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } - private RowIndexEntry afterAppend(DecoratedKey decoratedKey, long dataPosition, DeletionInfo delInfo, ColumnIndex index) + private void afterAppend(DecoratedKey decoratedKey, long dataPosition, RowIndexEntry index) { lastWrittenKey = decoratedKey; last = lastWrittenKey; @@ -144,30 +147,31 @@ public class SSTableWriter extends SSTable if (logger.isTraceEnabled()) logger.trace("wrote " + decoratedKey + " at " + dataPosition); - // range tombstones are part of the Atoms we write as the row contents, so RIE only gets row-level tombstones - RowIndexEntry entry = RowIndexEntry.create(dataPosition, delInfo.getTopLevelDeletion(), index); - iwriter.append(decoratedKey, entry); + iwriter.append(decoratedKey, index); dbuilder.addPotentialBoundary(dataPosition); - return entry; } + /** + * @param row + * @return null if the row was compacted away entirely; otherwise, the PK index entry for this row + */ public RowIndexEntry append(AbstractCompactedRow row) { long currentPosition = beforeAppend(row.key); + RowIndexEntry entry; try { - ByteBufferUtil.writeWithShortLength(row.key.key, dataFile.stream); - long dataStart = dataFile.getFilePointer(); - long dataSize = row.write(dataFile.stream); - assert dataSize == dataFile.getFilePointer() - (dataStart + 8) - : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); + entry = row.write(currentPosition, dataFile.stream); + if (entry == null) + return null; } catch (IOException e) { throw new FSWriteError(e, dataFile.getPath()); } sstableMetadataCollector.update(dataFile.getFilePointer() - currentPosition, row.columnStats()); - return afterAppend(row.key, currentPosition, row.deletionInfo(), row.index()); + afterAppend(row.key, currentPosition, entry); + return entry; } public void append(DecoratedKey decoratedKey, ColumnFamily cf) @@ -175,25 +179,8 @@ public class SSTableWriter extends SSTable long startPosition = beforeAppend(decoratedKey); try { - ByteBufferUtil.writeWithShortLength(decoratedKey.key, dataFile.stream); - - // Since the columnIndex may insert RangeTombstone marker, computing - // the size of the data is tricky. - DataOutputBuffer buffer = new DataOutputBuffer(); - - // build column index && write columns - ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, decoratedKey.key, buffer); - ColumnIndex index = builder.build(cf); - - TypeSizes typeSizes = TypeSizes.NATIVE; - long delSize = DeletionTime.serializer.serializedSize(cf.deletionInfo().getTopLevelDeletion(), typeSizes); - dataFile.stream.writeLong(buffer.getLength() + delSize + typeSizes.sizeof(0)); - - // Write deletion infos + column count - DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), dataFile.stream); - dataFile.stream.writeInt(builder.writtenAtomCount()); - dataFile.stream.write(buffer.getData(), 0, buffer.getLength()); - afterAppend(decoratedKey, startPosition, cf.deletionInfo(), index); + RowIndexEntry entry = rawAppend(cf, startPosition, decoratedKey, dataFile.stream); + afterAppend(decoratedKey, startPosition, entry); } catch (IOException e) { @@ -202,38 +189,26 @@ public class SSTableWriter extends SSTable sstableMetadataCollector.update(dataFile.getFilePointer() - startPosition, cf.getColumnStats()); } + public static RowIndexEntry rawAppend(ColumnFamily cf, long startPosition, DecoratedKey key, DataOutput out) throws IOException + { + assert cf.getColumnCount() > 0 || cf.isMarkedForDelete(); + + ColumnIndex.Builder builder = new ColumnIndex.Builder(cf, key.key, out); + ColumnIndex index = builder.build(cf); + + out.writeShort(END_OF_ROW); + return RowIndexEntry.create(startPosition, cf.deletionInfo().getTopLevelDeletion(), index); + } + /** * @throws IOException if a read from the DataInput fails * @throws FSWriteError if a write to the dataFile fails */ - public long appendFromStream(DecoratedKey key, CFMetaData metadata, long dataSize, DataInput in) throws IOException + public long appendFromStream(DecoratedKey key, CFMetaData metadata, DataInput in) throws IOException { long currentPosition = beforeAppend(key); - long dataStart; - try - { - ByteBufferUtil.writeWithShortLength(key.key, dataFile.stream); - dataStart = dataFile.getFilePointer(); - // write row size - dataFile.stream.writeLong(dataSize); - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(in, descriptor.version); - int columnCount = in.readInt(); - - try - { - DeletionInfo.serializer().serializeForSSTable(deletionInfo, dataFile.stream); - dataFile.stream.writeInt(columnCount); - } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } // deserialize each column to obtain maxTimestamp and immediately serialize it. long minTimestamp = Long.MAX_VALUE; @@ -245,42 +220,45 @@ public class SSTableWriter extends SSTable ColumnIndex.Builder columnIndexer = new ColumnIndex.Builder(cf, key.key, dataFile.stream, true); OnDiskAtom.Serializer atomSerializer = Column.onDiskSerializer(); - for (int i = 0; i < columnCount; i++) + try { - // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the - // data size received, so we must reserialize the exact same data - OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT); - if (atom instanceof CounterColumn) - atom = ((CounterColumn) atom).markDeltaToBeCleared(); - - int deletionTime = atom.getLocalDeletionTime(); - if (deletionTime < Integer.MAX_VALUE) + while (true) { - tombstones.update(deletionTime); - } - minTimestamp = Math.min(minTimestamp, atom.minTimestamp()); - maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp()); - maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime()); + // deserialize column with PRESERVE_SIZE because we've written the dataSize based on the + // data size received, so we must reserialize the exact same data + OnDiskAtom atom = atomSerializer.deserializeFromSSTable(in, ColumnSerializer.Flag.PRESERVE_SIZE, Integer.MIN_VALUE, Descriptor.Version.CURRENT); + if (atom == null) + break; + if (atom instanceof CounterColumn) + atom = ((CounterColumn) atom).markDeltaToBeCleared(); + + int deletionTime = atom.getLocalDeletionTime(); + if (deletionTime < Integer.MAX_VALUE) + { + tombstones.update(deletionTime); + } + minTimestamp = Math.min(minTimestamp, atom.minTimestamp()); + maxTimestamp = Math.max(maxTimestamp, atom.maxTimestamp()); + maxLocalDeletionTime = Math.max(maxLocalDeletionTime, atom.getLocalDeletionTime()); - try - { columnIndexer.add(atom); // This write the atom on disk too } - catch (IOException e) - { - throw new FSWriteError(e, dataFile.getPath()); - } + + columnIndexer.finish(); + dataFile.stream.writeShort(END_OF_ROW); + } + catch (IOException e) + { + throw new FSWriteError(e, dataFile.getPath()); } - assert dataSize == dataFile.getFilePointer() - (dataStart + 8) - : "incorrect row data size " + dataSize + " written to " + dataFile.getPath() + "; correct is " + (dataFile.getFilePointer() - (dataStart + 8)); sstableMetadataCollector.updateMinTimestamp(minTimestamp); sstableMetadataCollector.updateMaxTimestamp(maxTimestamp); sstableMetadataCollector.updateMaxLocalDeletionTime(maxLocalDeletionTime); sstableMetadataCollector.addRowSize(dataFile.getFilePointer() - currentPosition); - sstableMetadataCollector.addColumnCount(columnCount); + sstableMetadataCollector.addColumnCount(columnIndexer.writtenAtomCount()); sstableMetadataCollector.mergeTombstoneHistogram(tombstones); - afterAppend(key, currentPosition, deletionInfo, columnIndexer.build()); + afterAppend(key, currentPosition, RowIndexEntry.create(currentPosition, deletionInfo.getTopLevelDeletion(), columnIndexer.build())); return currentPosition; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java index e1aaa56..92f5c7f 100644 --- a/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java +++ b/src/java/org/apache/cassandra/streaming/IncomingStreamReader.java @@ -27,15 +27,14 @@ import com.google.common.base.Throwables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.ning.compress.lzf.LZFInputStream; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamily; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.ColumnSerializer; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.Table; import org.apache.cassandra.db.compaction.CompactionController; -import org.apache.cassandra.db.compaction.PrecompactedRow; -import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.io.sstable.SSTableWriter; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.StreamingMetrics; import org.apache.cassandra.service.StorageService; @@ -43,7 +42,6 @@ import org.apache.cassandra.streaming.compress.CompressedInputStream; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.BytesReadTracker; import org.apache.cassandra.utils.Pair; -import com.ning.compress.lzf.LZFInputStream; public class IncomingStreamReader { @@ -157,28 +155,11 @@ public class IncomingStreamReader while (bytesRead < length) { in.reset(0); + key = StorageService.getPartitioner().decorateKey(ByteBufferUtil.readWithShortLength(in)); - long dataSize = in.readLong(); - - if (cfs.containsCachedRow(key) && remoteFile.type == OperationType.AES && dataSize <= DatabaseDescriptor.getInMemoryCompactionLimit()) - { - // need to update row cache - // Note: Because we won't just echo the columns, there is no need to use the PRESERVE_SIZE flag, contrarily to what appendFromStream does below - SSTableIdentityIterator iter = new SSTableIdentityIterator(cfs.metadata, in, localFile.getFilename(), key, 0, dataSize, ColumnSerializer.Flag.FROM_REMOTE); - PrecompactedRow row = new PrecompactedRow(controller, Collections.singletonList(iter)); - // We don't expire anything so the row shouldn't be empty - assert !row.isEmpty(); - writer.append(row); - - // update cache - ColumnFamily cf = row.getFullColumnFamily(); - cfs.maybeUpdateRowCache(key, cf); - } - else - { - writer.appendFromStream(key, cfs.metadata, dataSize, in); - cfs.invalidateCachedRow(key); - } + writer.appendFromStream(key, cfs.metadata, in); + + cfs.invalidateCachedRow(key); bytesRead += in.getBytesRead(); // when compressed, report total bytes of compressed chunks read since remoteFile.size is the sum of chunks transferred http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/src/java/org/apache/cassandra/tools/SSTableExport.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 9e8bb3e..e3b02b1 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -33,6 +33,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.util.RandomAccessReader; import org.apache.cassandra.utils.ByteBufferUtil; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.map.ObjectMapper; @@ -103,15 +104,15 @@ public class SSTableExport * </ul> * * @param out The output steam to write data - * @param cf to which the metadata belongs + * @param deletionInfo */ - private static void writeMeta(PrintStream out, ColumnFamily cf) + private static void writeMeta(PrintStream out, DeletionInfo deletionInfo) { - if (!cf.deletionInfo().equals(DeletionInfo.LIVE)) + if (!deletionInfo.equals(DeletionInfo.LIVE)) { // begin meta writeKey(out, "metadata"); - writeDeletionInfo(out, cf.deletionInfo().getTopLevelDeletion()); + writeDeletionInfo(out, deletionInfo.getTopLevelDeletion()); out.print(","); } } @@ -130,22 +131,22 @@ public class SSTableExport * * @param atoms column iterator * @param out output stream - * @param comparator columns comparator * @param cfMetaData Column Family metadata (to get validator) */ - private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, AbstractType<?> comparator, CFMetaData cfMetaData) + private static void serializeAtoms(Iterator<OnDiskAtom> atoms, PrintStream out, CFMetaData cfMetaData) { while (atoms.hasNext()) { - writeJSON(out, serializeAtom(atoms.next(), comparator, cfMetaData)); + writeJSON(out, serializeAtom(atoms.next(), cfMetaData)); if (atoms.hasNext()) out.print(", "); } } - private static List<Object> serializeAtom(OnDiskAtom atom, AbstractType<?> comparator, CFMetaData cfMetaData) + private static List<Object> serializeAtom(OnDiskAtom atom, CFMetaData cfMetaData) { + AbstractType<?> comparator = cfMetaData.comparator; if (atom instanceof Column) { return serializeColumn((Column) atom, comparator, cfMetaData); @@ -219,21 +220,22 @@ public class SSTableExport */ private static void serializeRow(SSTableIdentityIterator row, DecoratedKey key, PrintStream out) { - ColumnFamily columnFamily = row.getColumnFamily(); - CFMetaData cfMetaData = columnFamily.metadata(); - AbstractType<?> comparator = columnFamily.getComparator(); + serializeRow(row.getColumnFamily().deletionInfo(), row, row.getColumnFamily().metadata(), key, out); + } + private static void serializeRow(DeletionInfo deletionInfo, Iterator<OnDiskAtom> atoms, CFMetaData metadata, DecoratedKey key, PrintStream out) + { out.print("{"); writeKey(out, "key"); writeJSON(out, bytesToHex(key.key)); out.print(","); - writeMeta(out, columnFamily); + writeMeta(out, deletionInfo); writeKey(out, "columns"); out.print("["); - serializeAtoms(row, out, comparator, cfMetaData); + serializeAtoms(atoms, out, metadata); out.print("]"); out.print("}"); @@ -277,10 +279,10 @@ public class SSTableExport */ public static void export(Descriptor desc, PrintStream outs, Collection<String> toExport, String[] excludes) throws IOException { - SSTableReader reader = SSTableReader.open(desc); - SSTableScanner scanner = reader.getScanner(); + SSTableReader sstable = SSTableReader.open(desc); + RandomAccessReader dfile = sstable.openDataReader(); - IPartitioner<?> partitioner = reader.partitioner; + IPartitioner<?> partitioner = sstable.partitioner; if (excludes != null) toExport.removeAll(Arrays.asList(excludes)); @@ -301,16 +303,20 @@ public class SSTableExport lastKey = decoratedKey; - scanner.seekTo(decoratedKey); - - if (!scanner.hasNext()) + RowIndexEntry entry = sstable.getPosition(decoratedKey, SSTableReader.Operator.EQ); + if (entry == null) continue; - SSTableIdentityIterator row = (SSTableIdentityIterator) scanner.next(); - if (!row.getKey().equals(decoratedKey)) - continue; + dfile.seek(entry.position); + ByteBufferUtil.readWithShortLength(dfile); // row key + if (sstable.descriptor.version.hasRowSizeAndColumnCount) + dfile.readLong(); // row size + DeletionInfo deletionInfo = DeletionInfo.serializer().deserializeFromSSTable(dfile, sstable.descriptor.version); + int columnCount = sstable.descriptor.version.hasRowSizeAndColumnCount ? dfile.readInt() : Integer.MAX_VALUE; + + Iterator<OnDiskAtom> atomIterator = sstable.metadata.getOnDiskIterator(dfile, columnCount, sstable.descriptor.version); - serializeRow(row, decoratedKey, outs); + serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs); if (i != 0) outs.println(","); @@ -321,8 +327,6 @@ public class SSTableExport outs.println("\n]"); outs.flush(); - - scanner.close(); } // This is necessary to accommodate the test suite since you cannot open a Reader more http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 338abb9..015ece0 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -52,12 +52,12 @@ public class SchemaLoader private static Logger logger = LoggerFactory.getLogger(SchemaLoader.class); @BeforeClass - public static void loadSchema() throws IOException + public static void loadSchema() throws IOException, ConfigurationException { loadSchema(false); } - public static void loadSchema(boolean withOldCfIds) throws IOException + public static void loadSchema(boolean withOldCfIds) throws IOException, ConfigurationException { // Cleanup first cleanupAndLeaveDirs(); @@ -72,18 +72,13 @@ public class SchemaLoader } }); - - // Migrations aren't happy if gossiper is not started + // Migrations aren't happy if gossiper is not started. Even if we don't use migrations though, + // some tests now expect us to start gossip for them. startGossiper(); - try - { - for (KSMetaData ksm : schemaDefinition(withOldCfIds)) - MigrationManager.announceNewKeyspace(ksm); - } - catch (ConfigurationException e) - { - throw new RuntimeException(e); - } + // if you're messing with low-level sstable stuff, it can be useful to inject the schema directly + // Schema.instance.load(schemaDefinition(withOldCfIds)); + for (KSMetaData ksm : schemaDefinition(withOldCfIds)) + MigrationManager.announceNewKeyspace(ksm); } public static void startGossiper() http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/Util.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/Util.java b/test/unit/org/apache/cassandra/Util.java index c1e35b4..a14fd99 100644 --- a/test/unit/org/apache/cassandra/Util.java +++ b/test/unit/org/apache/cassandra/Util.java @@ -36,7 +36,6 @@ import java.util.concurrent.Future; import org.apache.cassandra.db.*; import org.apache.cassandra.db.compaction.AbstractCompactionTask; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.compaction.CompactionTask; import org.apache.cassandra.db.columniterator.IdentityQueryFilter; import org.apache.cassandra.db.filter.IDiskAtomFilter; import org.apache.cassandra.db.filter.QueryFilter; @@ -285,21 +284,4 @@ public class Util assert thrown : exception.getName() + " not received"; } - - public static ByteBuffer serializeForSSTable(ColumnFamily cf) - { - try - { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - DataOutputStream out = new DataOutputStream(baos); - DeletionInfo.serializer().serializeForSSTable(cf.deletionInfo(), out); - out.writeInt(cf.getColumnCount()); - new ColumnIndex.Builder(cf, ByteBufferUtil.EMPTY_BYTE_BUFFER, out).build(cf); - return ByteBuffer.wrap(baos.toByteArray()); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/ScrubTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ScrubTest.java b/test/unit/org/apache/cassandra/db/ScrubTest.java deleted file mode 100644 index d1635d8..0000000 --- a/test/unit/org/apache/cassandra/db/ScrubTest.java +++ /dev/null @@ -1,222 +0,0 @@ -package org.apache.cassandra.db; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.ExecutionException; - -import org.junit.Test; -import org.junit.runner.RunWith; - -import org.apache.cassandra.OrderedJUnit4ClassRunner; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.Util; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.commitlog.ReplayPosition; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.db.columniterator.IdentityQueryFilter; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.db.filter.NamesQueryFilter; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.io.sstable.Component; -import org.apache.cassandra.io.sstable.SSTableMetadata; -import org.apache.cassandra.io.sstable.SSTableReader; -import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.io.util.FileUtils; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.CLibrary; - -import static org.apache.cassandra.Util.column; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; - -@RunWith(OrderedJUnit4ClassRunner.class) -public class ScrubTest extends SchemaLoader -{ - public String TABLE = "Keyspace1"; - public String CF = "Standard1"; - public String CF3 = "Standard2"; - - public String copySSTables(String cf) throws IOException - { - String root = System.getProperty("corrupt-sstable-root"); - assert root != null; - File rootDir = new File(root); - assert rootDir.isDirectory(); - - File destDir = Directories.create(TABLE, cf).getDirectoryForNewSSTables(1); - - String corruptSSTableName = null; - - FileUtils.createDirectory(destDir); - for (File srcFile : rootDir.listFiles()) - { - if (srcFile.getName().equals(".svn")) - continue; - if (!srcFile.getName().contains(cf)) - continue; - File destFile = new File(destDir, srcFile.getName()); - CLibrary.createHardLink(srcFile, destFile); - - assert destFile.exists() : destFile.getAbsoluteFile(); - - if(destFile.getName().endsWith("Data.db")) - corruptSSTableName = destFile.getCanonicalPath(); - } - - assert corruptSSTableName != null; - return corruptSSTableName; - } - - @Test - public void testScrubOneRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); - - List<Row> rows; - - // insert data and verify we get it back w/ range query - fillCF(cfs, 1); - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(1, rows.size()); - - CompactionManager.instance.performScrub(cfs); - - // check data is still there - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(1, rows.size()); - } - - @Test - public void testScrubDeletedRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF3); - - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF3); - cf.delete(new DeletionInfo(0, 1)); // expired tombstone - RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(1), cf); - rm.applyUnsafe(); - cfs.forceBlockingFlush(); - - CompactionManager.instance.performScrub(cfs); - assert cfs.getSSTables().isEmpty(); - } - - @Test - public void testScrubMultiRow() throws IOException, ExecutionException, InterruptedException, ConfigurationException - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - ColumnFamilyStore cfs = table.getColumnFamilyStore(CF); - - List<Row> rows; - - // insert data and verify we get it back w/ range query - fillCF(cfs, 10); - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(10, rows.size()); - - CompactionManager.instance.performScrub(cfs); - - // check data is still there - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assertEquals(10, rows.size()); - } - - @Test - public void testScubOutOfOrder() throws Exception - { - CompactionManager.instance.disableAutoCompaction(); - Table table = Table.open(TABLE); - String columnFamily = "Standard3"; - ColumnFamilyStore cfs = table.getColumnFamilyStore(columnFamily); - - /* - * Code used to generate an outOfOrder sstable. The test for out-of-order key in SSTableWriter must also be commented out. - * The test also assumes an ordered partitioner. - * - ColumnFamily cf = ArrayBackedSortedColumns.factory.create(cfs.metadata); - cf.addColumn(new Column(ByteBufferUtil.bytes("someName"), ByteBufferUtil.bytes("someValue"), 0L)); - - SSTableWriter writer = new SSTableWriter(cfs.getTempSSTablePath(new File(System.getProperty("corrupt-sstable-root"))), - cfs.metadata.getIndexInterval(), - cfs.metadata, - cfs.partitioner, - SSTableMetadata.createCollector()); - writer.append(Util.dk("a"), cf); - writer.append(Util.dk("b"), cf); - writer.append(Util.dk("z"), cf); - writer.append(Util.dk("c"), cf); - writer.append(Util.dk("y"), cf); - writer.append(Util.dk("d"), cf); - writer.closeAndOpenReader(); - */ - - copySSTables(columnFamily); - cfs.loadNewSSTables(); - assert cfs.getSSTables().size() > 0; - - List<Row> rows; - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assert !isRowOrdered(rows) : "'corrupt' test file actually was not"; - - CompactionManager.instance.performScrub(cfs); - rows = cfs.getRangeSlice(Util.range("", ""), 1000, new IdentityQueryFilter(), null); - assert isRowOrdered(rows) : "Scrub failed: " + rows; - assert rows.size() == 6 : "Got " + rows.size(); - } - - private static boolean isRowOrdered(List<Row> rows) - { - DecoratedKey prev = null; - for (Row row : rows) - { - if (prev != null && prev.compareTo(row.key) > 0) - return false; - prev = row.key; - } - return true; - } - - protected void fillCF(ColumnFamilyStore cfs, int rowsPerSSTable) throws ExecutionException, InterruptedException, IOException - { - for (int i = 0; i < rowsPerSSTable; i++) - { - String key = String.valueOf(i); - // create a row and update the birthdate value, test that the index query fetches the new version - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create(TABLE, CF); - cf.addColumn(column("c1", "1", 1L)); - cf.addColumn(column("c2", "2", 1L)); - RowMutation rm = new RowMutation(TABLE, ByteBufferUtil.bytes(key), cf); - rm.applyUnsafe(); - } - - cfs.forceBlockingFlush(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/SerializationsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/SerializationsTest.java b/test/unit/org/apache/cassandra/db/SerializationsTest.java index f87a0f8..267353f 100644 --- a/test/unit/org/apache/cassandra/db/SerializationsTest.java +++ b/test/unit/org/apache/cassandra/db/SerializationsTest.java @@ -27,6 +27,7 @@ import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.net.CallbackInfo; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -48,7 +49,7 @@ public class SerializationsTest extends AbstractSerializationsTester Statics statics = new Statics(); @BeforeClass - public static void loadSchema() throws IOException + public static void loadSchema() throws IOException, ConfigurationException { loadSchema(true); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index f066b9b..9df5d25 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -147,8 +147,7 @@ public class CompactionsTest extends SchemaLoader // check that the shadowed column is gone SSTableReader sstable = cfs.getSSTables().iterator().next(); - SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter())); - scanner.seekTo(key); + SSTableScanner scanner = sstable.getScanner(new QueryFilter(null, "Super1", new IdentityQueryFilter()), key); OnDiskAtomIterator iter = scanner.next(); assertEquals(key, iter.getKey()); assert iter.next() instanceof RangeTombstone; http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java index f1e689e..cce32df 100644 --- a/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java +++ b/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java @@ -27,6 +27,7 @@ import java.util.HashSet; import java.util.List; import java.util.concurrent.ExecutionException; +import com.google.common.base.Objects; import org.junit.Test; import org.apache.cassandra.SchemaLoader; @@ -55,24 +56,24 @@ public class LazilyCompactedRowTest extends SchemaLoader // compare eager and lazy compactions AbstractCompactionIterable eager = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), - new PreCompactingController(cfs, sstables, gcBefore, false)); + new PreCompactingController(cfs, sstables, gcBefore)); AbstractCompactionIterable lazy = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), - new LazilyCompactingController(cfs, sstables, gcBefore, false)); - assertBytes(cfs, sstables, eager, lazy); + new LazilyCompactingController(cfs, sstables, gcBefore)); + assertBytes(cfs, eager, lazy); // compare eager and parallel-lazy compactions eager = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), - new PreCompactingController(cfs, sstables, gcBefore, false)); + new PreCompactingController(cfs, sstables, gcBefore)); AbstractCompactionIterable parallel = new ParallelCompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new CompactionController(cfs, new HashSet<SSTableReader>(sstables), gcBefore), 0); - assertBytes(cfs, sstables, eager, parallel); + assertBytes(cfs, eager, parallel); } - private static void assertBytes(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException + private static void assertBytes(ColumnFamilyStore cfs, AbstractCompactionIterable ci1, AbstractCompactionIterable ci2) throws IOException { CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator(); CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator(); @@ -89,8 +90,8 @@ public class LazilyCompactedRowTest extends SchemaLoader AbstractCompactedRow row2 = iter2.next(); DataOutputBuffer out1 = new DataOutputBuffer(); DataOutputBuffer out2 = new DataOutputBuffer(); - row1.write(out1); - row2.write(out2); + row1.write(-1, out1); + row2.write(-1, out2); File tmpFile1 = File.createTempFile("lcrt1", null); File tmpFile2 = File.createTempFile("lcrt2", null); @@ -104,28 +105,23 @@ public class LazilyCompactedRowTest extends SchemaLoader MappedFileDataInput in1 = new MappedFileDataInput(new FileInputStream(tmpFile1), tmpFile1.getAbsolutePath(), 0, 0); MappedFileDataInput in2 = new MappedFileDataInput(new FileInputStream(tmpFile2), tmpFile2.getAbsolutePath(), 0, 0); - // key isn't part of what CompactedRow writes, that's done by SSTW.append - - // row size can differ b/c of bloom filter counts being different - long rowSize1 = in1.readLong(); - long rowSize2 = in2.readLong(); - assertEquals(rowSize1 + 8, out1.getLength()); - assertEquals(rowSize2 + 8, out2.getLength()); + // row key + assertEquals(ByteBufferUtil.readWithShortLength(in1), ByteBufferUtil.readWithShortLength(in2)); // cf metadata ColumnFamily cf1 = TreeMapBackedSortedColumns.factory.create(cfs.metadata); ColumnFamily cf2 = TreeMapBackedSortedColumns.factory.create(cfs.metadata); cf1.delete(DeletionInfo.serializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT)); cf2.delete(DeletionInfo.serializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT)); - assert cf1.deletionInfo().equals(cf2.deletionInfo()); + assertEquals(cf1.deletionInfo(), cf2.deletionInfo()); // columns - int columns = in1.readInt(); - assert columns == in2.readInt(); - for (int i = 0; i < columns; i++) + while (true) { Column c1 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in1, Descriptor.Version.CURRENT); Column c2 = (Column)Column.onDiskSerializer().deserializeFromSSTable(in2, Descriptor.Version.CURRENT); - assert c1.equals(c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator); + assert Objects.equal(c1, c2) : c1.getString(cfs.metadata.comparator) + " != " + c2.getString(cfs.metadata.comparator); + if (c1 == null) + break; } // that should be everything assert in1.available() == 0; @@ -137,8 +133,8 @@ public class LazilyCompactedRowTest extends SchemaLoader { AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); Collection<SSTableReader> sstables = cfs.getSSTables(); - AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore, false)); - AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore, false)); + AbstractCompactionIterable ci1 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new PreCompactingController(cfs, sstables, gcBefore)); + AbstractCompactionIterable ci2 = new CompactionIterable(OperationType.UNKNOWN, strategy.getScanners(sstables), new LazilyCompactingController(cfs, sstables, gcBefore)); CloseableIterator<AbstractCompactedRow> iter1 = ci1.iterator(); CloseableIterator<AbstractCompactedRow> iter2 = ci2.iterator(); @@ -291,7 +287,7 @@ public class LazilyCompactedRowTest extends SchemaLoader private static class LazilyCompactingController extends CompactionController { - public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) + public LazilyCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) { super(cfs, new HashSet<SSTableReader>(sstables), gcBefore); } @@ -305,7 +301,7 @@ public class LazilyCompactedRowTest extends SchemaLoader private static class PreCompactingController extends CompactionController { - public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) + public PreCompactingController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore) { super(cfs, new HashSet<SSTableReader>(sstables), gcBefore); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java deleted file mode 100644 index 135f444..0000000 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java +++ /dev/null @@ -1,112 +0,0 @@ -/* -* Licensed to the Apache Software Foundation (ASF) under one -* or more contributor license agreements. See the NOTICE file -* distributed with this work for additional information -* regarding copyright ownership. The ASF licenses this file -* to you under the Apache License, Version 2.0 (the -* "License"); you may not use this file except in compliance -* with the License. You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, -* software distributed under the License is distributed on an -* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -* KIND, either express or implied. See the License for the -* specific language governing permissions and limitations -* under the License. -*/ - -package org.apache.cassandra.io.sstable; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.*; - -import org.junit.Test; - -import org.apache.cassandra.db.*; -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.io.util.RandomAccessReader; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.Util; - -public class SSTableTest extends SchemaLoader -{ - @Test - public void testSingleWrite() throws IOException - { - // write test data - ByteBuffer key = ByteBufferUtil.bytes(Integer.toString(1)); - ByteBuffer cbytes = ByteBuffer.wrap(new byte[1024]); - new Random().nextBytes(cbytes.array()); - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard1"); - cf.addColumn(new Column(cbytes, cbytes)); - - SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>(); - map.put(Util.dk(key), cf); - SSTableReader ssTable = SSTableUtils.prepare().cf("Standard1").write(map); - - // verify - ByteBuffer bytes = Util.serializeForSSTable(cf); - verifySingle(ssTable, bytes, key); - ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk - verifySingle(ssTable, bytes, key); - } - - private void verifySingle(SSTableReader sstable, ByteBuffer bytes, ByteBuffer key) throws IOException - { - RandomAccessReader file = sstable.openDataReader(); - file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); - assert key.equals(ByteBufferUtil.readWithShortLength(file)); - int size = (int) file.readLong(); - byte[] bytes2 = new byte[size]; - file.readFully(bytes2); - assert ByteBuffer.wrap(bytes2).equals(bytes); - } - - @Test - public void testManyWrites() throws IOException - { - SortedMap<DecoratedKey, ColumnFamily> map = new TreeMap<DecoratedKey, ColumnFamily>(); - SortedMap<ByteBuffer, ByteBuffer> bytesMap = new TreeMap<ByteBuffer, ByteBuffer>(); - //for (int i = 100; i < 1000; ++i) - for (int i = 100; i < 300; ++i) - { - ColumnFamily cf = TreeMapBackedSortedColumns.factory.create("Keyspace1", "Standard2"); - ByteBuffer bytes = ByteBufferUtil.bytes(("Avinash Lakshman is a good man: " + i)); - cf.addColumn(new Column(bytes, bytes)); - map.put(Util.dk(Integer.toString(i)), cf); - bytesMap.put(ByteBufferUtil.bytes(Integer.toString(i)), Util.serializeForSSTable(cf)); - } - - // write - SSTableReader ssTable = SSTableUtils.prepare().cf("Standard2").write(map); - - // verify - verifyMany(ssTable, bytesMap); - ssTable = SSTableReader.open(ssTable.descriptor); // read the index from disk - verifyMany(ssTable, bytesMap); - - Set<Component> live = SSTable.componentsFor(ssTable.descriptor); - assert !live.isEmpty() : "SSTable has no live components"; - Set<Component> temp = SSTable.componentsFor(ssTable.descriptor.asTemporary(true)); - assert temp.isEmpty() : "SSTable has unexpected temp components"; - } - - private void verifyMany(SSTableReader sstable, Map<ByteBuffer, ByteBuffer> map) throws IOException - { - List<ByteBuffer> keys = new ArrayList<ByteBuffer>(map.keySet()); - //Collections.shuffle(keys); - RandomAccessReader file = sstable.openDataReader(); - for (ByteBuffer key : keys) - { - file.seek(sstable.getPosition(sstable.partitioner.decorateKey(key), SSTableReader.Operator.EQ).position); - assert key.equals( ByteBufferUtil.readWithShortLength(file)); - int size = (int) file.readLong(); - byte[] bytes2 = new byte[size]; - file.readFully(bytes2); - assert Arrays.equals(bytes2, map.get(key).array()); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java index 73abbe2..c3e83cb 100644 --- a/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java +++ b/test/unit/org/apache/cassandra/service/LeaveAndBootstrapTest.java @@ -50,7 +50,7 @@ public class LeaveAndBootstrapTest private static IPartitioner oldPartitioner; @BeforeClass - public static void setup() throws IOException + public static void setup() throws IOException, ConfigurationException { oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); SchemaLoader.loadSchema(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/MoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/MoveTest.java b/test/unit/org/apache/cassandra/service/MoveTest.java index e30bbde..5454127 100644 --- a/test/unit/org/apache/cassandra/service/MoveTest.java +++ b/test/unit/org/apache/cassandra/service/MoveTest.java @@ -57,7 +57,7 @@ public class MoveTest * So instead of extending SchemaLoader, we call it's method below. */ @BeforeClass - public static void setup() throws IOException + public static void setup() throws IOException, ConfigurationException { oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); SchemaLoader.loadSchema(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9bd531b/test/unit/org/apache/cassandra/service/RemoveTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/RemoveTest.java b/test/unit/org/apache/cassandra/service/RemoveTest.java index 278c8f1..4f3217c 100644 --- a/test/unit/org/apache/cassandra/service/RemoveTest.java +++ b/test/unit/org/apache/cassandra/service/RemoveTest.java @@ -65,7 +65,7 @@ public class RemoveTest UUID removalId; @BeforeClass - public static void setupClass() throws IOException + public static void setupClass() throws IOException, ConfigurationException { oldPartitioner = StorageService.instance.setPartitionerUnsafe(partitioner); SchemaLoader.loadSchema();
