Updated Branches:
refs/heads/trunk b8578df60 -> 3eff5455d
Merge branch 'cassandra-1.1' into trunk
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/ColumnFamilyStore.java
src/java/org/apache/cassandra/db/Table.java
src/java/org/apache/cassandra/db/compaction/CompactionController.java
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/io/sstable/SSTableReader.java
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/tools/BulkLoader.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3eff5455
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3eff5455
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3eff5455
Branch: refs/heads/trunk
Commit: 3eff5455dc58e6b7e0aadf987a635320f4bd5ef1
Parents: b8578df 09e5443
Author: Sylvain Lebresne <[email protected]>
Authored: Fri Jun 29 11:18:46 2012 +0200
Committer: Sylvain Lebresne <[email protected]>
Committed: Fri Jun 29 11:18:46 2012 +0200
----------------------------------------------------------------------
CHANGES.txt | 3 +
bin/sstablescrub | 50 ++
.../org/apache/cassandra/db/ColumnFamilyStore.java | 28 +-
src/java/org/apache/cassandra/db/DefsTable.java | 2 +-
src/java/org/apache/cassandra/db/Table.java | 20 +-
.../db/compaction/CompactionController.java | 12 +-
.../cassandra/db/compaction/CompactionManager.java | 228 +---------
.../cassandra/db/compaction/LeveledManifest.java | 100 +++--
.../apache/cassandra/db/compaction/Scrubber.java | 357 +++++++++++++++
src/java/org/apache/cassandra/dht/Bounds.java | 6 +
.../apache/cassandra/hadoop/BulkRecordWriter.java | 6 +-
.../apache/cassandra/io/sstable/SSTableLoader.java | 12 +-
.../apache/cassandra/io/sstable/SSTableReader.java | 25 +
.../apache/cassandra/io/sstable/SSTableWriter.java | 4 +-
.../apache/cassandra/service/StorageService.java | 9 +-
.../org/apache/cassandra/tools/BulkLoader.java | 24 +-
.../apache/cassandra/tools/StandaloneScrubber.java | 282 ++++++++++++
.../org/apache/cassandra/utils/OutputHandler.java | 95 ++++
test/unit/org/apache/cassandra/db/ScrubTest.java | 70 +++-
19 files changed, 1017 insertions(+), 316 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dc2459b,a40e52e..a92665e
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -222,11 -227,16 +223,16 @@@ public class ColumnFamilyStore implemen
// scan for sstables corresponding to this cf and load them
data = new DataTracker(this);
- Directories.SSTableLister sstables =
directories.sstableLister().skipCompacted(true).skipTemporary(true);
-
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(),
data, metadata, this.partitioner));
- Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching ==
Caching.ROWS_ONLY
- ? Collections.<DecoratedKey>emptySet()
- :
CacheService.instance.keyCache.readSaved(table.name, columnFamily, partitioner);
+
+ if (loadSSTables)
+ {
+ Directories.SSTableLister sstables =
directories.sstableLister().skipCompacted(true).skipTemporary(true);
-
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(),
savedKeys, data, metadata, this.partitioner));
++
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(),
data, metadata, this.partitioner));
+ }
+
+ if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
+ CacheService.instance.keyCache.loadSaved(this);
+
// compaction strategy should be created after the CFS has been
prepared
this.compactionStrategy =
metadata.createCompactionStrategyInstance(this);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Table.java
index 388679a,f3a414e..60f2dda
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@@ -304,7 -347,7 +308,7 @@@ public class Tabl
}
/** adds a cf to internal structures, ends up creating disk files). */
- public void initCf(UUID cfId, String cfName)
- public void initCf(Integer cfId, String cfName, boolean loadSSTables)
++ public void initCf(UUID cfId, String cfName, boolean loadSSTables)
{
if (columnFamilyStores.containsKey(cfId))
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index c91684f,1a90ab0..4b42ed4
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -63,6 -66,19 +63,15 @@@ public class CompactionControlle
public CompactionController(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
{
+ this(cfs,
+ gcBefore,
- forceDeserialize || !allLatestVersion(sstables),
-
DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)),
-
cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables)));
++
DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)));
+ }
+
+ protected CompactionController(ColumnFamilyStore cfs,
+ int gcBefore,
- boolean deserializeRequired,
- IntervalTree<SSTableReader>
overlappingTree,
- boolean keyExistenceIsExpensive)
++ DataTracker.SSTableIntervalTree
overlappingTree)
+ {
assert cfs != null;
this.cfs = cfs;
this.gcBefore = gcBefore;
@@@ -71,8 -87,9 +80,7 @@@
// add 5 minutes to be sure we're on the safe side in terms of thread
safety (though we should be fine in our
// current 'stop all write during memtable switch' situation).
this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 *
3600) / 1000);
- Set<SSTableReader> overlappingSSTables =
cfs.getOverlappingSSTables(sstables);
- overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
- this.deserializeRequired = deserializeRequired;
+ this.overlappingTree = overlappingTree;
- this.keyExistenceIsExpensive = keyExistenceIsExpensive;
}
public String getKeyspace()
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0000000,314a873..aeff46f
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -1,0 -1,355 +1,357 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.cassandra.db.compaction;
+
+ import java.nio.ByteBuffer;
+ import java.io.*;
+ import java.util.*;
+
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.io.sstable.*;
+ import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.io.util.RandomAccessReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.IntervalTree.*;
+ import org.apache.cassandra.utils.OutputHandler;
+
+ public class Scrubber implements Closeable
+ {
+ public final ColumnFamilyStore cfs;
+ public final SSTableReader sstable;
+ public final File destination;
+
+ private final CompactionController controller;
+ private final boolean isCommutative;
+ private final int expectedBloomFilterSize;
+
+ private final RandomAccessReader dataFile;
+ private final RandomAccessReader indexFile;
+ private final ScrubInfo scrubInfo;
+
+ private long rowsRead;
+
+ private SSTableWriter writer;
+ private SSTableReader newSstable;
+ private SSTableReader newInOrderSstable;
+
+ private int goodRows;
+ private int badRows;
+ private int emptyRows;
+
+ private final OutputHandler outputHandler;
+
+ private static final Comparator<AbstractCompactedRow> acrComparator = new
Comparator<AbstractCompactedRow>()
+ {
+ public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+ {
+ return r1.key.compareTo(r2.key);
+ }
+ };
+ private final Set<AbstractCompactedRow> outOfOrderRows = new
TreeSet<AbstractCompactedRow>(acrComparator);
+
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws
IOException
+ {
+ this(cfs, sstable, new OutputHandler.LogOutput(), false);
+ }
+
+ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable,
OutputHandler outputHandler, boolean isOffline) throws IOException
+ {
+ this.cfs = cfs;
+ this.sstable = sstable;
+ this.outputHandler = outputHandler;
+
+ // Calculate the expected compacted filesize
+ this.destination =
cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
+ if (destination == null)
+ throw new IOException("disk full");
+
+ List<SSTableReader> toScrub = Collections.singletonList(sstable);
+ // If we run scrub offline, we should never purge tombstone, as we
cannot know if other sstable have data that the tombstone deletes.
+ this.controller = isOffline
+ ? new ScrubController(cfs)
+ : new CompactionController(cfs,
Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs),
true);
+ this.isCommutative =
cfs.metadata.getDefaultValidator().isCommutative();
+ this.expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(toScrub)));
+
+ // loop through each row, deserializing to check for damage.
+ // we'll also loop through the index at the same time, using the
position from the index to recover if the
+ // row header (key or data size) is corrupt. (This means our position
in the index file will be one row
+ // "ahead" of the data file.)
+ this.dataFile = sstable.openDataReader(true);
+ this.indexFile = RandomAccessReader.open(new
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ this.scrubInfo = new ScrubInfo(dataFile, sstable);
+ }
+
+ public void scrub() throws IOException
+ {
+ outputHandler.output("Scrubbing " + sstable);
+ try
+ {
+ ByteBuffer nextIndexKey =
ByteBufferUtil.readWithShortLength(indexFile);
+ {
+ // throw away variable so we don't have a side effect in the
assert
- long firstRowPositionFromIndex = indexFile.readLong();
++ long firstRowPositionFromIndex =
RowIndexEntry.serializer.deserialize(indexFile,
sstable.descriptor.version).position;
+ assert firstRowPositionFromIndex == 0 :
firstRowPositionFromIndex;
+ }
+
+ // TODO errors when creating the writer may leave empty temp
files.
+ writer = CompactionManager.maybeCreateWriter(cfs, destination,
expectedBloomFilterSize, null, Collections.singletonList(sstable));
+
+ AbstractCompactedRow prevRow = null;
+
+ while (!dataFile.isEOF())
+ {
+ if (scrubInfo.isStopRequested())
+ throw new
CompactionInterruptedException(scrubInfo.getCompactionInfo());
+ long rowStart = dataFile.getFilePointer();
+ outputHandler.debug("Reading row at " + rowStart);
+
+ DecoratedKey key = null;
+ long dataSize = -1;
+ try
+ {
+ key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
- dataSize = sstable.descriptor.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
++ dataSize = sstable.descriptor.version.hasIntRowSize ?
dataFile.readInt() : dataFile.readLong();
+ outputHandler.debug(String.format("row %s is %s bytes",
ByteBufferUtil.bytesToHex(key.key), dataSize));
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ // check for null key below
+ }
+
+ ByteBuffer currentIndexKey = nextIndexKey;
+ long nextRowPositionFromIndex;
+ try
+ {
+ nextIndexKey = indexFile.isEOF() ? null :
ByteBufferUtil.readWithShortLength(indexFile);
- nextRowPositionFromIndex = indexFile.isEOF() ?
dataFile.length() : indexFile.readLong();
++ nextRowPositionFromIndex = indexFile.isEOF()
++ ? dataFile.length()
++ :
RowIndexEntry.serializer.deserialize(indexFile,
sstable.descriptor.version).position;
+ }
+ catch (Throwable th)
+ {
+ outputHandler.warn("Error reading index file", th);
+ nextIndexKey = null;
+ nextRowPositionFromIndex = dataFile.length();
+ }
+
+ long dataStart = dataFile.getFilePointer();
+ long dataStartFromIndex = currentIndexKey == null
+ ? -1
- : rowStart + 2 +
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
++ : rowStart + 2 +
currentIndexKey.remaining() + (sstable.descriptor.version.hasIntRowSize ? 4 :
8);
+ long dataSizeFromIndex = nextRowPositionFromIndex -
dataStartFromIndex;
+ assert currentIndexKey != null || indexFile.isEOF();
+ if (currentIndexKey != null)
+ outputHandler.debug(String.format("Index doublecheck: row
%s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),
dataSizeFromIndex));
+
+ writer.mark();
+ try
+ {
+ if (key == null)
+ throw new IOError(new IOException("Unable to read row
key from data file"));
+ if (dataSize > dataFile.length())
+ throw new IOError(new IOException("Impossible row
size " + dataSize));
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+ AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ if (prevRow != null && acrComparator.compare(prevRow,
compactedRow) > 0)
+ {
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of order
row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
+ }
+
+ writer.append(compactedRow);
+ prevRow = compactedRow;
+ goodRows++;
+ }
+ if (!key.key.equals(currentIndexKey) || dataStart !=
dataStartFromIndex)
+ outputHandler.warn("Index file contained a different
key or row size; using key from data file");
+ }
+ catch (Throwable th)
+ {
+ throwIfFatal(th);
+ outputHandler.warn("Non-fatal error reading row
(stacktrace follows)", th);
+ writer.resetAndTruncate();
+
+ if (currentIndexKey != null
+ && (key == null || !key.key.equals(currentIndexKey)
|| dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+ {
+ outputHandler.output(String.format("Retrying from row
index; data is %s bytes starting at %s",
+ dataSizeFromIndex,
dataStartFromIndex));
+ key = SSTableReader.decodeKey(sstable.partitioner,
sstable.descriptor, currentIndexKey);
+ try
+ {
+ SSTableIdentityIterator row = new
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex,
dataSizeFromIndex, true);
+ AbstractCompactedRow compactedRow =
controller.getCompactedRow(row);
+ if (compactedRow.isEmpty())
+ {
+ emptyRows++;
+ }
+ else
+ {
+ if (prevRow != null &&
acrComparator.compare(prevRow, compactedRow) > 0)
+ {
+ outOfOrderRows.add(compactedRow);
+ outputHandler.warn(String.format("Out of
order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+ continue;
+ }
+ writer.append(compactedRow);
+ prevRow = compactedRow;
+ goodRows++;
+ }
+ }
+ catch (Throwable th2)
+ {
+ throwIfFatal(th2);
+ // Skipping rows is dangerous for counters (see
CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th2);
+
+ outputHandler.warn("Retry failed too. Skipping to
next row (retry's stacktrace follows)", th2);
+ writer.resetAndTruncate();
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
+ }
+ else
+ {
+ // Skipping rows is dangerous for counters (see
CASSANDRA-2759)
+ if (isCommutative)
+ throw new IOError(th);
+
+ outputHandler.warn("Row at " + dataStart + " is
unreadable; skipping to next");
+ if (currentIndexKey != null)
+ dataFile.seek(nextRowPositionFromIndex);
+ badRows++;
+ }
+ }
+ if ((rowsRead++ % 1000) == 0)
+ controller.mayThrottle(dataFile.getFilePointer());
+ }
+
+ if (writer.getFilePointer() > 0)
+ newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+ }
+ catch (Exception e)
+ {
+ if (writer != null)
+ writer.abort();
+ throw FBUtilities.unchecked(e);
+ }
+
+ if (!outOfOrderRows.isEmpty())
+ {
+ SSTableWriter inOrderWriter =
CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize,
null, Collections.singletonList(sstable));
+ for (AbstractCompactedRow row : outOfOrderRows)
+ inOrderWriter.append(row);
+ newInOrderSstable =
inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+ outputHandler.warn(String.format("%d out of order rows found
while scrubbing %s; Those have been written (in order) to a new sstable (%s)",
outOfOrderRows.size(), sstable, newInOrderSstable));
+ }
+
+ if (newSstable == null)
+ {
+ if (badRows > 0)
+ outputHandler.warn("No valid rows found while scrubbing " +
sstable + "; it is marked for deletion now. If you want to attempt manual
recovery, you can find a copy in the pre-scrub snapshot");
+ else
+ outputHandler.output("Scrub of " + sstable + " complete;
looks like all " + emptyRows + " rows were tombstoned");
+ }
+ else
+ {
+ outputHandler.output("Scrub of " + sstable + " complete: " +
goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows
dropped");
+ if (badRows > 0)
+ outputHandler.warn("Unable to recover " + badRows + " rows
that were skipped. You can attempt manual recovery from the pre-scrub
snapshot. You can also run nodetool repair to transfer the data from a healthy
replica, if any");
+ }
+ }
+
+ public SSTableReader getNewSSTable()
+ {
+ return newSstable;
+ }
+
+ public SSTableReader getNewInOrderSSTable()
+ {
+ return newInOrderSstable;
+ }
+
+ private void throwIfFatal(Throwable th)
+ {
+ if (th instanceof Error && !(th instanceof AssertionError || th
instanceof IOError))
+ throw (Error) th;
+ }
+
+ public void close()
+ {
+ FileUtils.closeQuietly(dataFile);
+ FileUtils.closeQuietly(indexFile);
+ }
+
+ public CompactionInfo.Holder getScrubInfo()
+ {
+ return scrubInfo;
+ }
+
+ private static class ScrubInfo extends CompactionInfo.Holder
+ {
+ private final RandomAccessReader dataFile;
+ private final SSTableReader sstable;
+
+ public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+ {
+ this.dataFile = dataFile;
+ this.sstable = sstable;
+ }
+
+ public CompactionInfo getCompactionInfo()
+ {
+ try
+ {
+ return new CompactionInfo(sstable.metadata,
+ OperationType.SCRUB,
+ dataFile.getFilePointer(),
+ dataFile.length());
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException();
+ }
+ }
+ }
+
+ private static class ScrubController extends CompactionController
+ {
+ public ScrubController(ColumnFamilyStore cfs)
+ {
- super(cfs, Integer.MAX_VALUE, true, new
IntervalTree<SSTableReader>(Collections.<Interval>emptyList()), false);
++ super(cfs, Integer.MAX_VALUE,
DataTracker.buildIntervalTree(Collections.<SSTableReader>emptyList()));
+ }
+
+ @Override
+ public boolean shouldPurge(DecoratedKey key)
+ {
+ return false;
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 5edd287,0aaa932..680fd8e
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -138,13 -138,29 +138,28 @@@ public class SSTableReader extends SSTa
return open(desc, componentsFor(desc), metadata, p);
}
+ public static SSTableReader openNoValidation(Descriptor descriptor,
Set<Component> components, CFMetaData metadata) throws IOException
+ {
- return open(descriptor, components,
Collections.<DecoratedKey>emptySet(), null, metadata,
StorageService.getPartitioner(), false);
++ return open(descriptor, components, null, metadata,
StorageService.getPartitioner(), false);
+ }
+
public static SSTableReader open(Descriptor descriptor, Set<Component>
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
{
- return open(descriptor, components,
Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
+ return open(descriptor, components, null, metadata, partitioner);
}
- public static SSTableReader open(Descriptor descriptor, Set<Component>
components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData
metadata, IPartitioner partitioner) throws IOException
+ public static SSTableReader open(Descriptor descriptor, Set<Component>
components, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner)
throws IOException
{
- return open(descriptor, components, savedKeys, tracker, metadata,
partitioner, true);
++ return open(descriptor, components, tracker, metadata, partitioner,
true);
+ }
+
+ private static SSTableReader open(Descriptor descriptor,
+ Set<Component> components,
- Set<DecoratedKey> savedKeys,
+ DataTracker tracker,
+ CFMetaData metadata,
+ IPartitioner partitioner,
+ boolean validate) throws IOException
+ {
assert partitioner != null;
// Minimum components without which we can't do anything
assert components.contains(Component.DATA);
@@@ -184,9 -200,13 +199,13 @@@
}
else
{
- sstable.load(false, savedKeys);
+ sstable.load(false);
sstable.loadBloomFilter();
}
+
+ if (validate)
+ sstable.validate();
+
if (logger.isDebugEnabled())
logger.debug("INDEX LOAD TIME for " + descriptor + ": " +
(System.currentTimeMillis() - start) + " ms.");
@@@ -389,75 -433,16 +408,81 @@@
// finalize the state of the reader
ifile =
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
+ if (readIndex) // save summary information to disk
+ saveSummary(this, ibuilder, dbuilder);
+ }
+
+ public static boolean loadSummary(SSTableReader reader,
SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new
File(reader.descriptor.filenameFor(Component.SUMMARY));
+ if (!summariesFile.exists())
+ return false;
+
+ DataInputStream iStream = null;
+ try
+ {
+ iStream = new DataInputStream(new FileInputStream(summariesFile));
+ reader.indexSummary =
IndexSummary.serializer.deserialize(iStream, reader.partitioner);
+ reader.first = decodeKey(reader.partitioner, reader.descriptor,
ByteBufferUtil.readWithLength(iStream));
+ reader.last = decodeKey(reader.partitioner, reader.descriptor,
ByteBufferUtil.readWithLength(iStream));
+ ibuilder.deserializeBounds(iStream);
+ dbuilder.deserializeBounds(iStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot deserialize SSTable Summary: ", e);
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ summariesFile.delete();
+
+ return false;
+ }
+ finally
+ {
+ FileUtils.closeQuietly(iStream);
+ }
+
+ return true;
+ }
+
+ public static void saveSummary(SSTableReader reader,
SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
+ {
+ File summariesFile = new
File(reader.descriptor.filenameFor(Component.SUMMARY));
+ if (summariesFile.exists())
+ summariesFile.delete();
+
+ DataOutputStream oStream = null;
+ try
+ {
+ oStream = new DataOutputStream(new
FileOutputStream(summariesFile));
+ IndexSummary.serializer.serialize(reader.indexSummary, oStream);
+ ByteBufferUtil.writeWithLength(reader.first.key, oStream);
+ ByteBufferUtil.writeWithLength(reader.last.key, oStream);
+ ibuilder.serializeBounds(oStream);
+ dbuilder.serializeBounds(oStream);
+ }
+ catch (IOException e)
+ {
+ logger.debug("Cannot save SSTable Summary: ", e);
+
+ // corrupted hence delete it and let it load it now.
+ if (summariesFile.exists())
+ summariesFile.delete();
+ }
+ finally
+ {
+ FileUtils.closeQuietly(oStream);
+ }
}
+ private void validate()
+ {
+ if (this.first.compareTo(this.last) > 0)
+ throw new IllegalStateException(String.format("SSTable first key
%s > last key %s", this.first, this.last));
+ }
+
/** get the position in the index file to start scanning to find the
given key (at most indexInterval keys away) */
- private long getIndexScanPosition(RowPosition key)
+ public long getIndexScanPosition(RowPosition key)
{
assert indexSummary.getKeys() != null &&
indexSummary.getKeys().size() > 0;
int index = Collections.binarySearch(indexSummary.getKeys(), key);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 2fc9771,5619951..05f5b25
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -130,11 -127,11 +130,11 @@@ public class SSTableWriter extends SSTa
/**
* Perform sanity checks on @param decoratedKey and @return the position
in the data file before any data is written
*/
- private long beforeAppend(DecoratedKey<?> decoratedKey) throws IOException
+ private long beforeAppend(DecoratedKey decoratedKey) throws IOException
{
assert decoratedKey != null : "Keys must not be null";
- assert lastWrittenKey == null ||
lastWrittenKey.compareTo(decoratedKey) < 0
- : "Last written key " + lastWrittenKey + " >= current key " +
decoratedKey + " writing into " + getFilename();
+ if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey)
>= 0)
+ throw new RuntimeException("Last written key " + lastWrittenKey +
" >= current key " + decoratedKey + " writing into " + getFilename());
return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/BulkLoader.java
index 8465fb6,ace37db..24b2423
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@@ -173,11 -176,11 +175,11 @@@ public class BulkLoade
static class ExternalClient extends SSTableLoader.Client
{
private final Map<String, Set<String>> knownCfs = new HashMap<String,
Set<String>>();
- private final SSTableLoader.OutputHandler outputHandler;
+ private final OutputHandler outputHandler;
- private Set<InetAddress> hosts = new HashSet<InetAddress>();
- private int rpcPort;
+ private final Set<InetAddress> hosts;
+ private final int rpcPort;
- public ExternalClient(SSTableLoader.OutputHandler outputHandler,
Set<InetAddress> hosts, int port)
+ public ExternalClient(OutputHandler outputHandler, Set<InetAddress>
hosts, int port)
{
super();
this.outputHandler = outputHandler;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------