Updated Branches: refs/heads/trunk b8578df60 -> 3eff5455d
Merge branch 'cassandra-1.1' into trunk Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/ColumnFamilyStore.java src/java/org/apache/cassandra/db/Table.java src/java/org/apache/cassandra/db/compaction/CompactionController.java src/java/org/apache/cassandra/db/compaction/CompactionManager.java src/java/org/apache/cassandra/io/sstable/SSTableReader.java src/java/org/apache/cassandra/service/StorageService.java src/java/org/apache/cassandra/tools/BulkLoader.java Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3eff5455 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3eff5455 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3eff5455 Branch: refs/heads/trunk Commit: 3eff5455dc58e6b7e0aadf987a635320f4bd5ef1 Parents: b8578df 09e5443 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Jun 29 11:18:46 2012 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Fri Jun 29 11:18:46 2012 +0200 ---------------------------------------------------------------------- CHANGES.txt | 3 + bin/sstablescrub | 50 ++ .../org/apache/cassandra/db/ColumnFamilyStore.java | 28 +- src/java/org/apache/cassandra/db/DefsTable.java | 2 +- src/java/org/apache/cassandra/db/Table.java | 20 +- .../db/compaction/CompactionController.java | 12 +- .../cassandra/db/compaction/CompactionManager.java | 228 +--------- .../cassandra/db/compaction/LeveledManifest.java | 100 +++-- .../apache/cassandra/db/compaction/Scrubber.java | 357 +++++++++++++++ src/java/org/apache/cassandra/dht/Bounds.java | 6 + .../apache/cassandra/hadoop/BulkRecordWriter.java | 6 +- .../apache/cassandra/io/sstable/SSTableLoader.java | 12 +- .../apache/cassandra/io/sstable/SSTableReader.java | 25 + .../apache/cassandra/io/sstable/SSTableWriter.java | 4 +- .../apache/cassandra/service/StorageService.java | 9 +- .../org/apache/cassandra/tools/BulkLoader.java | 24 +- .../apache/cassandra/tools/StandaloneScrubber.java | 282 ++++++++++++ .../org/apache/cassandra/utils/OutputHandler.java | 95 ++++ test/unit/org/apache/cassandra/db/ScrubTest.java | 70 +++- 19 files changed, 1017 insertions(+), 316 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index dc2459b,a40e52e..a92665e --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -222,11 -227,16 +223,16 @@@ public class ColumnFamilyStore implemen // scan for sstables corresponding to this cf and load them data = new DataTracker(this); - Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true); - data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), data, metadata, this.partitioner)); - Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == Caching.ROWS_ONLY - ? Collections.<DecoratedKey>emptySet() - : CacheService.instance.keyCache.readSaved(table.name, columnFamily, partitioner); + + if (loadSSTables) + { + Directories.SSTableLister sstables = directories.sstableLister().skipCompacted(true).skipTemporary(true); - data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), savedKeys, data, metadata, this.partitioner)); ++ data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), data, metadata, this.partitioner)); + } + + if (caching == Caching.ALL || caching == Caching.KEYS_ONLY) + CacheService.instance.keyCache.loadSaved(this); + // compaction strategy should be created after the CFS has been prepared this.compactionStrategy = metadata.createCompactionStrategyInstance(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/DefsTable.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Table.java index 388679a,f3a414e..60f2dda --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@@ -304,7 -347,7 +308,7 @@@ public class Tabl } /** adds a cf to internal structures, ends up creating disk files). */ - public void initCf(UUID cfId, String cfName) - public void initCf(Integer cfId, String cfName, boolean loadSSTables) ++ public void initCf(UUID cfId, String cfName, boolean loadSSTables) { if (columnFamilyStores.containsKey(cfId)) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionController.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java index c91684f,1a90ab0..4b42ed4 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@@ -63,6 -66,19 +63,15 @@@ public class CompactionControlle public CompactionController(ColumnFamilyStore cfs, Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize) { + this(cfs, + gcBefore, - forceDeserialize || !allLatestVersion(sstables), - DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)), - cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables))); ++ DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables))); + } + + protected CompactionController(ColumnFamilyStore cfs, + int gcBefore, - boolean deserializeRequired, - IntervalTree<SSTableReader> overlappingTree, - boolean keyExistenceIsExpensive) ++ DataTracker.SSTableIntervalTree overlappingTree) + { assert cfs != null; this.cfs = cfs; this.gcBefore = gcBefore; @@@ -71,8 -87,9 +80,7 @@@ // add 5 minutes to be sure we're on the safe side in terms of thread safety (though we should be fine in our // current 'stop all write during memtable switch' situation). this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 3600) / 1000); - Set<SSTableReader> overlappingSSTables = cfs.getOverlappingSSTables(sstables); - overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables); - this.deserializeRequired = deserializeRequired; + this.overlappingTree = overlappingTree; - this.keyExistenceIsExpensive = keyExistenceIsExpensive; } public String getKeyspace() http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java index 0000000,314a873..aeff46f mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@@ -1,0 -1,355 +1,357 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.db.compaction; + + import java.nio.ByteBuffer; + import java.io.*; + import java.util.*; + + import org.apache.cassandra.config.DatabaseDescriptor; + import org.apache.cassandra.db.*; + import org.apache.cassandra.io.sstable.*; + import org.apache.cassandra.io.util.FileUtils; + import org.apache.cassandra.io.util.RandomAccessReader; + import org.apache.cassandra.utils.ByteBufferUtil; + import org.apache.cassandra.utils.FBUtilities; + import org.apache.cassandra.utils.IntervalTree.*; + import org.apache.cassandra.utils.OutputHandler; + + public class Scrubber implements Closeable + { + public final ColumnFamilyStore cfs; + public final SSTableReader sstable; + public final File destination; + + private final CompactionController controller; + private final boolean isCommutative; + private final int expectedBloomFilterSize; + + private final RandomAccessReader dataFile; + private final RandomAccessReader indexFile; + private final ScrubInfo scrubInfo; + + private long rowsRead; + + private SSTableWriter writer; + private SSTableReader newSstable; + private SSTableReader newInOrderSstable; + + private int goodRows; + private int badRows; + private int emptyRows; + + private final OutputHandler outputHandler; + + private static final Comparator<AbstractCompactedRow> acrComparator = new Comparator<AbstractCompactedRow>() + { + public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2) + { + return r1.key.compareTo(r2.key); + } + }; + private final Set<AbstractCompactedRow> outOfOrderRows = new TreeSet<AbstractCompactedRow>(acrComparator); + + public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws IOException + { + this(cfs, sstable, new OutputHandler.LogOutput(), false); + } + + public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outputHandler, boolean isOffline) throws IOException + { + this.cfs = cfs; + this.sstable = sstable; + this.outputHandler = outputHandler; + + // Calculate the expected compacted filesize + this.destination = cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength()); + if (destination == null) + throw new IOException("disk full"); + + List<SSTableReader> toScrub = Collections.singletonList(sstable); + // If we run scrub offline, we should never purge tombstone, as we cannot know if other sstable have data that the tombstone deletes. + this.controller = isOffline + ? new ScrubController(cfs) + : new CompactionController(cfs, Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs), true); + this.isCommutative = cfs.metadata.getDefaultValidator().isCommutative(); + this.expectedBloomFilterSize = Math.max(DatabaseDescriptor.getIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(toScrub))); + + // loop through each row, deserializing to check for damage. + // we'll also loop through the index at the same time, using the position from the index to recover if the + // row header (key or data size) is corrupt. (This means our position in the index file will be one row + // "ahead" of the data file.) + this.dataFile = sstable.openDataReader(true); + this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true); + this.scrubInfo = new ScrubInfo(dataFile, sstable); + } + + public void scrub() throws IOException + { + outputHandler.output("Scrubbing " + sstable); + try + { + ByteBuffer nextIndexKey = ByteBufferUtil.readWithShortLength(indexFile); + { + // throw away variable so we don't have a side effect in the assert - long firstRowPositionFromIndex = indexFile.readLong(); ++ long firstRowPositionFromIndex = RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + assert firstRowPositionFromIndex == 0 : firstRowPositionFromIndex; + } + + // TODO errors when creating the writer may leave empty temp files. + writer = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable)); + + AbstractCompactedRow prevRow = null; + + while (!dataFile.isEOF()) + { + if (scrubInfo.isStopRequested()) + throw new CompactionInterruptedException(scrubInfo.getCompactionInfo()); + long rowStart = dataFile.getFilePointer(); + outputHandler.debug("Reading row at " + rowStart); + + DecoratedKey key = null; + long dataSize = -1; + try + { + key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile)); - dataSize = sstable.descriptor.hasIntRowSize ? dataFile.readInt() : dataFile.readLong(); ++ dataSize = sstable.descriptor.version.hasIntRowSize ? dataFile.readInt() : dataFile.readLong(); + outputHandler.debug(String.format("row %s is %s bytes", ByteBufferUtil.bytesToHex(key.key), dataSize)); + } + catch (Throwable th) + { + throwIfFatal(th); + // check for null key below + } + + ByteBuffer currentIndexKey = nextIndexKey; + long nextRowPositionFromIndex; + try + { + nextIndexKey = indexFile.isEOF() ? null : ByteBufferUtil.readWithShortLength(indexFile); - nextRowPositionFromIndex = indexFile.isEOF() ? dataFile.length() : indexFile.readLong(); ++ nextRowPositionFromIndex = indexFile.isEOF() ++ ? dataFile.length() ++ : RowIndexEntry.serializer.deserialize(indexFile, sstable.descriptor.version).position; + } + catch (Throwable th) + { + outputHandler.warn("Error reading index file", th); + nextIndexKey = null; + nextRowPositionFromIndex = dataFile.length(); + } + + long dataStart = dataFile.getFilePointer(); + long dataStartFromIndex = currentIndexKey == null + ? -1 - : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8); ++ : rowStart + 2 + currentIndexKey.remaining() + (sstable.descriptor.version.hasIntRowSize ? 4 : 8); + long dataSizeFromIndex = nextRowPositionFromIndex - dataStartFromIndex; + assert currentIndexKey != null || indexFile.isEOF(); + if (currentIndexKey != null) + outputHandler.debug(String.format("Index doublecheck: row %s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey), dataSizeFromIndex)); + + writer.mark(); + try + { + if (key == null) + throw new IOError(new IOException("Unable to read row key from data file")); + if (dataSize > dataFile.length()) + throw new IOError(new IOException("Impossible row size " + dataSize)); + SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); + if (compactedRow.isEmpty()) + { + emptyRows++; + } + else + { + if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0) + { + outOfOrderRows.add(compactedRow); + outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key)); + continue; + } + + writer.append(compactedRow); + prevRow = compactedRow; + goodRows++; + } + if (!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex) + outputHandler.warn("Index file contained a different key or row size; using key from data file"); + } + catch (Throwable th) + { + throwIfFatal(th); + outputHandler.warn("Non-fatal error reading row (stacktrace follows)", th); + writer.resetAndTruncate(); + + if (currentIndexKey != null + && (key == null || !key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex)) + { + outputHandler.output(String.format("Retrying from row index; data is %s bytes starting at %s", + dataSizeFromIndex, dataStartFromIndex)); + key = SSTableReader.decodeKey(sstable.partitioner, sstable.descriptor, currentIndexKey); + try + { + SSTableIdentityIterator row = new SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, dataSizeFromIndex, true); + AbstractCompactedRow compactedRow = controller.getCompactedRow(row); + if (compactedRow.isEmpty()) + { + emptyRows++; + } + else + { + if (prevRow != null && acrComparator.compare(prevRow, compactedRow) > 0) + { + outOfOrderRows.add(compactedRow); + outputHandler.warn(String.format("Out of order row detected (%s found after %s)", compactedRow.key, prevRow.key)); + continue; + } + writer.append(compactedRow); + prevRow = compactedRow; + goodRows++; + } + } + catch (Throwable th2) + { + throwIfFatal(th2); + // Skipping rows is dangerous for counters (see CASSANDRA-2759) + if (isCommutative) + throw new IOError(th2); + + outputHandler.warn("Retry failed too. Skipping to next row (retry's stacktrace follows)", th2); + writer.resetAndTruncate(); + dataFile.seek(nextRowPositionFromIndex); + badRows++; + } + } + else + { + // Skipping rows is dangerous for counters (see CASSANDRA-2759) + if (isCommutative) + throw new IOError(th); + + outputHandler.warn("Row at " + dataStart + " is unreadable; skipping to next"); + if (currentIndexKey != null) + dataFile.seek(nextRowPositionFromIndex); + badRows++; + } + } + if ((rowsRead++ % 1000) == 0) + controller.mayThrottle(dataFile.getFilePointer()); + } + + if (writer.getFilePointer() > 0) + newSstable = writer.closeAndOpenReader(sstable.maxDataAge); + } + catch (Exception e) + { + if (writer != null) + writer.abort(); + throw FBUtilities.unchecked(e); + } + + if (!outOfOrderRows.isEmpty()) + { + SSTableWriter inOrderWriter = CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, null, Collections.singletonList(sstable)); + for (AbstractCompactedRow row : outOfOrderRows) + inOrderWriter.append(row); + newInOrderSstable = inOrderWriter.closeAndOpenReader(sstable.maxDataAge); + outputHandler.warn(String.format("%d out of order rows found while scrubbing %s; Those have been written (in order) to a new sstable (%s)", outOfOrderRows.size(), sstable, newInOrderSstable)); + } + + if (newSstable == null) + { + if (badRows > 0) + outputHandler.warn("No valid rows found while scrubbing " + sstable + "; it is marked for deletion now. If you want to attempt manual recovery, you can find a copy in the pre-scrub snapshot"); + else + outputHandler.output("Scrub of " + sstable + " complete; looks like all " + emptyRows + " rows were tombstoned"); + } + else + { + outputHandler.output("Scrub of " + sstable + " complete: " + goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped"); + if (badRows > 0) + outputHandler.warn("Unable to recover " + badRows + " rows that were skipped. You can attempt manual recovery from the pre-scrub snapshot. You can also run nodetool repair to transfer the data from a healthy replica, if any"); + } + } + + public SSTableReader getNewSSTable() + { + return newSstable; + } + + public SSTableReader getNewInOrderSSTable() + { + return newInOrderSstable; + } + + private void throwIfFatal(Throwable th) + { + if (th instanceof Error && !(th instanceof AssertionError || th instanceof IOError)) + throw (Error) th; + } + + public void close() + { + FileUtils.closeQuietly(dataFile); + FileUtils.closeQuietly(indexFile); + } + + public CompactionInfo.Holder getScrubInfo() + { + return scrubInfo; + } + + private static class ScrubInfo extends CompactionInfo.Holder + { + private final RandomAccessReader dataFile; + private final SSTableReader sstable; + + public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable) + { + this.dataFile = dataFile; + this.sstable = sstable; + } + + public CompactionInfo getCompactionInfo() + { + try + { + return new CompactionInfo(sstable.metadata, + OperationType.SCRUB, + dataFile.getFilePointer(), + dataFile.length()); + } + catch (Exception e) + { + throw new RuntimeException(); + } + } + } + + private static class ScrubController extends CompactionController + { + public ScrubController(ColumnFamilyStore cfs) + { - super(cfs, Integer.MAX_VALUE, true, new IntervalTree<SSTableReader>(Collections.<Interval>emptyList()), false); ++ super(cfs, Integer.MAX_VALUE, DataTracker.buildIntervalTree(Collections.<SSTableReader>emptyList())); + } + + @Override + public boolean shouldPurge(DecoratedKey key) + { + return false; + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/dht/Bounds.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableReader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java index 5edd287,0aaa932..680fd8e --- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java @@@ -138,13 -138,29 +138,28 @@@ public class SSTableReader extends SSTa return open(desc, componentsFor(desc), metadata, p); } + public static SSTableReader openNoValidation(Descriptor descriptor, Set<Component> components, CFMetaData metadata) throws IOException + { - return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, StorageService.getPartitioner(), false); ++ return open(descriptor, components, null, metadata, StorageService.getPartitioner(), false); + } + public static SSTableReader open(Descriptor descriptor, Set<Component> components, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, Collections.<DecoratedKey>emptySet(), null, metadata, partitioner); + return open(descriptor, components, null, metadata, partitioner); } - public static SSTableReader open(Descriptor descriptor, Set<Component> components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException + public static SSTableReader open(Descriptor descriptor, Set<Component> components, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) throws IOException { - return open(descriptor, components, savedKeys, tracker, metadata, partitioner, true); ++ return open(descriptor, components, tracker, metadata, partitioner, true); + } + + private static SSTableReader open(Descriptor descriptor, + Set<Component> components, - Set<DecoratedKey> savedKeys, + DataTracker tracker, + CFMetaData metadata, + IPartitioner partitioner, + boolean validate) throws IOException + { assert partitioner != null; // Minimum components without which we can't do anything assert components.contains(Component.DATA); @@@ -184,9 -200,13 +199,13 @@@ } else { - sstable.load(false, savedKeys); + sstable.load(false); sstable.loadBloomFilter(); } + + if (validate) + sstable.validate(); + if (logger.isDebugEnabled()) logger.debug("INDEX LOAD TIME for " + descriptor + ": " + (System.currentTimeMillis() - start) + " ms."); @@@ -389,75 -433,16 +408,81 @@@ // finalize the state of the reader ifile = ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX)); dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA)); + if (readIndex) // save summary information to disk + saveSummary(this, ibuilder, dbuilder); + } + + public static boolean loadSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); + if (!summariesFile.exists()) + return false; + + DataInputStream iStream = null; + try + { + iStream = new DataInputStream(new FileInputStream(summariesFile)); + reader.indexSummary = IndexSummary.serializer.deserialize(iStream, reader.partitioner); + reader.first = decodeKey(reader.partitioner, reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + reader.last = decodeKey(reader.partitioner, reader.descriptor, ByteBufferUtil.readWithLength(iStream)); + ibuilder.deserializeBounds(iStream); + dbuilder.deserializeBounds(iStream); + } + catch (IOException e) + { + logger.debug("Cannot deserialize SSTable Summary: ", e); + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + summariesFile.delete(); + + return false; + } + finally + { + FileUtils.closeQuietly(iStream); + } + + return true; + } + + public static void saveSummary(SSTableReader reader, SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder) + { + File summariesFile = new File(reader.descriptor.filenameFor(Component.SUMMARY)); + if (summariesFile.exists()) + summariesFile.delete(); + + DataOutputStream oStream = null; + try + { + oStream = new DataOutputStream(new FileOutputStream(summariesFile)); + IndexSummary.serializer.serialize(reader.indexSummary, oStream); + ByteBufferUtil.writeWithLength(reader.first.key, oStream); + ByteBufferUtil.writeWithLength(reader.last.key, oStream); + ibuilder.serializeBounds(oStream); + dbuilder.serializeBounds(oStream); + } + catch (IOException e) + { + logger.debug("Cannot save SSTable Summary: ", e); + + // corrupted hence delete it and let it load it now. + if (summariesFile.exists()) + summariesFile.delete(); + } + finally + { + FileUtils.closeQuietly(oStream); + } } + private void validate() + { + if (this.first.compareTo(this.last) > 0) + throw new IllegalStateException(String.format("SSTable first key %s > last key %s", this.first, this.last)); + } + /** get the position in the index file to start scanning to find the given key (at most indexInterval keys away) */ - private long getIndexScanPosition(RowPosition key) + public long getIndexScanPosition(RowPosition key) { assert indexSummary.getKeys() != null && indexSummary.getKeys().size() > 0; int index = Collections.binarySearch(indexSummary.getKeys(), key); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 2fc9771,5619951..05f5b25 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@@ -130,11 -127,11 +130,11 @@@ public class SSTableWriter extends SSTa /** * Perform sanity checks on @param decoratedKey and @return the position in the data file before any data is written */ - private long beforeAppend(DecoratedKey<?> decoratedKey) throws IOException + private long beforeAppend(DecoratedKey decoratedKey) throws IOException { assert decoratedKey != null : "Keys must not be null"; - assert lastWrittenKey == null || lastWrittenKey.compareTo(decoratedKey) < 0 - : "Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename(); + if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) >= 0) + throw new RuntimeException("Last written key " + lastWrittenKey + " >= current key " + decoratedKey + " writing into " + getFilename()); return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/tools/BulkLoader.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/tools/BulkLoader.java index 8465fb6,ace37db..24b2423 --- a/src/java/org/apache/cassandra/tools/BulkLoader.java +++ b/src/java/org/apache/cassandra/tools/BulkLoader.java @@@ -173,11 -176,11 +175,11 @@@ public class BulkLoade static class ExternalClient extends SSTableLoader.Client { private final Map<String, Set<String>> knownCfs = new HashMap<String, Set<String>>(); - private final SSTableLoader.OutputHandler outputHandler; + private final OutputHandler outputHandler; - private Set<InetAddress> hosts = new HashSet<InetAddress>(); - private int rpcPort; + private final Set<InetAddress> hosts; + private final int rpcPort; - public ExternalClient(SSTableLoader.OutputHandler outputHandler, Set<InetAddress> hosts, int port) + public ExternalClient(OutputHandler outputHandler, Set<InetAddress> hosts, int port) { super(); this.outputHandler = outputHandler; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/test/unit/org/apache/cassandra/db/ScrubTest.java ----------------------------------------------------------------------