Updated Branches:
  refs/heads/trunk b8578df60 -> 3eff5455d

Merge branch 'cassandra-1.1' into trunk

Conflicts:
        CHANGES.txt
        src/java/org/apache/cassandra/db/ColumnFamilyStore.java
        src/java/org/apache/cassandra/db/Table.java
        src/java/org/apache/cassandra/db/compaction/CompactionController.java
        src/java/org/apache/cassandra/db/compaction/CompactionManager.java
        src/java/org/apache/cassandra/io/sstable/SSTableReader.java
        src/java/org/apache/cassandra/service/StorageService.java
        src/java/org/apache/cassandra/tools/BulkLoader.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3eff5455
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3eff5455
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3eff5455

Branch: refs/heads/trunk
Commit: 3eff5455dc58e6b7e0aadf987a635320f4bd5ef1
Parents: b8578df 09e5443
Author: Sylvain Lebresne <sylv...@datastax.com>
Authored: Fri Jun 29 11:18:46 2012 +0200
Committer: Sylvain Lebresne <sylv...@datastax.com>
Committed: Fri Jun 29 11:18:46 2012 +0200

----------------------------------------------------------------------
 CHANGES.txt                                        |    3 +
 bin/sstablescrub                                   |   50 ++
 .../org/apache/cassandra/db/ColumnFamilyStore.java |   28 +-
 src/java/org/apache/cassandra/db/DefsTable.java    |    2 +-
 src/java/org/apache/cassandra/db/Table.java        |   20 +-
 .../db/compaction/CompactionController.java        |   12 +-
 .../cassandra/db/compaction/CompactionManager.java |  228 +---------
 .../cassandra/db/compaction/LeveledManifest.java   |  100 +++--
 .../apache/cassandra/db/compaction/Scrubber.java   |  357 +++++++++++++++
 src/java/org/apache/cassandra/dht/Bounds.java      |    6 +
 .../apache/cassandra/hadoop/BulkRecordWriter.java  |    6 +-
 .../apache/cassandra/io/sstable/SSTableLoader.java |   12 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   25 +
 .../apache/cassandra/io/sstable/SSTableWriter.java |    4 +-
 .../apache/cassandra/service/StorageService.java   |    9 +-
 .../org/apache/cassandra/tools/BulkLoader.java     |   24 +-
 .../apache/cassandra/tools/StandaloneScrubber.java |  282 ++++++++++++
 .../org/apache/cassandra/utils/OutputHandler.java  |   95 ++++
 test/unit/org/apache/cassandra/db/ScrubTest.java   |   70 +++-
 19 files changed, 1017 insertions(+), 316 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/CHANGES.txt
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index dc2459b,a40e52e..a92665e
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -222,11 -227,16 +223,16 @@@ public class ColumnFamilyStore implemen
  
          // scan for sstables corresponding to this cf and load them
          data = new DataTracker(this);
-         Directories.SSTableLister sstables = 
directories.sstableLister().skipCompacted(true).skipTemporary(true);
-         
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
data, metadata, this.partitioner));
 -        Set<DecoratedKey> savedKeys = caching == Caching.NONE || caching == 
Caching.ROWS_ONLY
 -                                       ? Collections.<DecoratedKey>emptySet()
 -                                       : 
CacheService.instance.keyCache.readSaved(table.name, columnFamily, partitioner);
+ 
+         if (loadSSTables)
+         {
+             Directories.SSTableLister sstables = 
directories.sstableLister().skipCompacted(true).skipTemporary(true);
 -            
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
savedKeys, data, metadata, this.partitioner));
++            
data.addInitialSSTables(SSTableReader.batchOpen(sstables.list().entrySet(), 
data, metadata, this.partitioner));
+         }
+ 
 +        if (caching == Caching.ALL || caching == Caching.KEYS_ONLY)
 +            CacheService.instance.keyCache.loadSaved(this);
 +
          // compaction strategy should be created after the CFS has been 
prepared
          this.compactionStrategy = 
metadata.createCompactionStrategyInstance(this);
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/DefsTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/Table.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/Table.java
index 388679a,f3a414e..60f2dda
--- a/src/java/org/apache/cassandra/db/Table.java
+++ b/src/java/org/apache/cassandra/db/Table.java
@@@ -304,7 -347,7 +308,7 @@@ public class Tabl
      }
  
      /** adds a cf to internal structures, ends up creating disk files). */
-     public void initCf(UUID cfId, String cfName)
 -    public void initCf(Integer cfId, String cfName, boolean loadSSTables)
++    public void initCf(UUID cfId, String cfName, boolean loadSSTables)
      {
          if (columnFamilyStores.containsKey(cfId))
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionController.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionController.java
index c91684f,1a90ab0..4b42ed4
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@@ -63,6 -66,19 +63,15 @@@ public class CompactionControlle
  
      public CompactionController(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, int gcBefore, boolean forceDeserialize)
      {
+         this(cfs,
+              gcBefore,
 -             forceDeserialize || !allLatestVersion(sstables),
 -             
DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)),
 -             
cfs.getCompactionStrategy().isKeyExistenceExpensive(ImmutableSet.copyOf(sstables)));
++             
DataTracker.buildIntervalTree(cfs.getOverlappingSSTables(sstables)));
+     }
+ 
+     protected CompactionController(ColumnFamilyStore cfs,
+                                    int gcBefore,
 -                                   boolean deserializeRequired,
 -                                   IntervalTree<SSTableReader> 
overlappingTree,
 -                                   boolean keyExistenceIsExpensive)
++                                   DataTracker.SSTableIntervalTree 
overlappingTree)
+     {
          assert cfs != null;
          this.cfs = cfs;
          this.gcBefore = gcBefore;
@@@ -71,8 -87,9 +80,7 @@@
          // add 5 minutes to be sure we're on the safe side in terms of thread 
safety (though we should be fine in our
          // current 'stop all write during memtable switch' situation).
          this.mergeShardBefore = (int) ((cfs.oldestUnflushedMemtable() + 5 * 
3600) / 1000);
-         Set<SSTableReader> overlappingSSTables = 
cfs.getOverlappingSSTables(sstables);
-         overlappingTree = DataTracker.buildIntervalTree(overlappingSSTables);
 -        this.deserializeRequired = deserializeRequired;
+         this.overlappingTree = overlappingTree;
 -        this.keyExistenceIsExpensive = keyExistenceIsExpensive;
      }
  
      public String getKeyspace()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/LeveledManifest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/db/compaction/Scrubber.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/Scrubber.java
index 0000000,314a873..aeff46f
mode 000000,100644..100644
--- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java
+++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java
@@@ -1,0 -1,355 +1,357 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.cassandra.db.compaction;
+ 
+ import java.nio.ByteBuffer;
+ import java.io.*;
+ import java.util.*;
+ 
+ import org.apache.cassandra.config.DatabaseDescriptor;
+ import org.apache.cassandra.db.*;
+ import org.apache.cassandra.io.sstable.*;
+ import org.apache.cassandra.io.util.FileUtils;
+ import org.apache.cassandra.io.util.RandomAccessReader;
+ import org.apache.cassandra.utils.ByteBufferUtil;
+ import org.apache.cassandra.utils.FBUtilities;
+ import org.apache.cassandra.utils.IntervalTree.*;
+ import org.apache.cassandra.utils.OutputHandler;
+ 
+ public class Scrubber implements Closeable
+ {
+     public final ColumnFamilyStore cfs;
+     public final SSTableReader sstable;
+     public final File destination;
+ 
+     private final CompactionController controller;
+     private final boolean isCommutative;
+     private final int expectedBloomFilterSize;
+ 
+     private final RandomAccessReader dataFile;
+     private final RandomAccessReader indexFile;
+     private final ScrubInfo scrubInfo;
+ 
+     private long rowsRead;
+ 
+     private SSTableWriter writer;
+     private SSTableReader newSstable;
+     private SSTableReader newInOrderSstable;
+ 
+     private int goodRows;
+     private int badRows;
+     private int emptyRows;
+ 
+     private final OutputHandler outputHandler;
+ 
+     private static final Comparator<AbstractCompactedRow> acrComparator = new 
Comparator<AbstractCompactedRow>()
+     {
+          public int compare(AbstractCompactedRow r1, AbstractCompactedRow r2)
+          {
+              return r1.key.compareTo(r2.key);
+          }
+     };
+     private final Set<AbstractCompactedRow> outOfOrderRows = new 
TreeSet<AbstractCompactedRow>(acrComparator);
+ 
+     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable) throws 
IOException
+     {
+         this(cfs, sstable, new OutputHandler.LogOutput(), false);
+     }
+ 
+     public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, 
OutputHandler outputHandler, boolean isOffline) throws IOException
+     {
+         this.cfs = cfs;
+         this.sstable = sstable;
+         this.outputHandler = outputHandler;
+ 
+         // Calculate the expected compacted filesize
+         this.destination = 
cfs.directories.getDirectoryForNewSSTables(sstable.onDiskLength());
+         if (destination == null)
+             throw new IOException("disk full");
+ 
+         List<SSTableReader> toScrub = Collections.singletonList(sstable);
+         // If we run scrub offline, we should never purge tombstone, as we 
cannot know if other sstable have data that the tombstone deletes.
+         this.controller = isOffline
+                         ? new ScrubController(cfs)
+                         : new CompactionController(cfs, 
Collections.singletonList(sstable), CompactionManager.getDefaultGcBefore(cfs), 
true);
+         this.isCommutative = 
cfs.metadata.getDefaultValidator().isCommutative();
+         this.expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(toScrub)));
+ 
+         // loop through each row, deserializing to check for damage.
+         // we'll also loop through the index at the same time, using the 
position from the index to recover if the
+         // row header (key or data size) is corrupt. (This means our position 
in the index file will be one row
+         // "ahead" of the data file.)
+         this.dataFile = sstable.openDataReader(true);
+         this.indexFile = RandomAccessReader.open(new 
File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+         this.scrubInfo = new ScrubInfo(dataFile, sstable);
+     }
+ 
+     public void scrub() throws IOException
+     {
+         outputHandler.output("Scrubbing " + sstable);
+         try
+         {
+             ByteBuffer nextIndexKey = 
ByteBufferUtil.readWithShortLength(indexFile);
+             {
+                 // throw away variable so we don't have a side effect in the 
assert
 -                long firstRowPositionFromIndex = indexFile.readLong();
++                long firstRowPositionFromIndex = 
RowIndexEntry.serializer.deserialize(indexFile, 
sstable.descriptor.version).position;
+                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
+             }
+ 
+             // TODO errors when creating the writer may leave empty temp 
files.
+             writer = CompactionManager.maybeCreateWriter(cfs, destination, 
expectedBloomFilterSize, null, Collections.singletonList(sstable));
+ 
+             AbstractCompactedRow prevRow = null;
+ 
+             while (!dataFile.isEOF())
+             {
+                 if (scrubInfo.isStopRequested())
+                     throw new 
CompactionInterruptedException(scrubInfo.getCompactionInfo());
+                 long rowStart = dataFile.getFilePointer();
+                 outputHandler.debug("Reading row at " + rowStart);
+ 
+                 DecoratedKey key = null;
+                 long dataSize = -1;
+                 try
+                 {
+                     key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
 -                    dataSize = sstable.descriptor.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
++                    dataSize = sstable.descriptor.version.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
+                     outputHandler.debug(String.format("row %s is %s bytes", 
ByteBufferUtil.bytesToHex(key.key), dataSize));
+                 }
+                 catch (Throwable th)
+                 {
+                     throwIfFatal(th);
+                     // check for null key below
+                 }
+ 
+                 ByteBuffer currentIndexKey = nextIndexKey;
+                 long nextRowPositionFromIndex;
+                 try
+                 {
+                     nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
 -                    nextRowPositionFromIndex = indexFile.isEOF() ? 
dataFile.length() : indexFile.readLong();
++                    nextRowPositionFromIndex = indexFile.isEOF()
++                                             ? dataFile.length()
++                                             : 
RowIndexEntry.serializer.deserialize(indexFile, 
sstable.descriptor.version).position;
+                 }
+                 catch (Throwable th)
+                 {
+                     outputHandler.warn("Error reading index file", th);
+                     nextIndexKey = null;
+                     nextRowPositionFromIndex = dataFile.length();
+                 }
+ 
+                 long dataStart = dataFile.getFilePointer();
+                 long dataStartFromIndex = currentIndexKey == null
+                                         ? -1
 -                                        : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
++                                        : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.version.hasIntRowSize ? 4 : 
8);
+                 long dataSizeFromIndex = nextRowPositionFromIndex - 
dataStartFromIndex;
+                 assert currentIndexKey != null || indexFile.isEOF();
+                 if (currentIndexKey != null)
+                     outputHandler.debug(String.format("Index doublecheck: row 
%s is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  
dataSizeFromIndex));
+ 
+                 writer.mark();
+                 try
+                 {
+                     if (key == null)
+                         throw new IOError(new IOException("Unable to read row 
key from data file"));
+                     if (dataSize > dataFile.length())
+                         throw new IOError(new IOException("Impossible row 
size " + dataSize));
+                     SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                     AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                     if (compactedRow.isEmpty())
+                     {
+                         emptyRows++;
+                     }
+                     else
+                     {
+                         if (prevRow != null && acrComparator.compare(prevRow, 
compactedRow) > 0)
+                         {
+                             outOfOrderRows.add(compactedRow);
+                             outputHandler.warn(String.format("Out of order 
row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                             continue;
+                         }
+ 
+                         writer.append(compactedRow);
+                         prevRow = compactedRow;
+                         goodRows++;
+                     }
+                     if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
+                         outputHandler.warn("Index file contained a different 
key or row size; using key from data file");
+                 }
+                 catch (Throwable th)
+                 {
+                     throwIfFatal(th);
+                     outputHandler.warn("Non-fatal error reading row 
(stacktrace follows)", th);
+                     writer.resetAndTruncate();
+ 
+                     if (currentIndexKey != null
+                         && (key == null || !key.key.equals(currentIndexKey) 
|| dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                     {
+                         outputHandler.output(String.format("Retrying from row 
index; data is %s bytes starting at %s",
+                                                   dataSizeFromIndex, 
dataStartFromIndex));
+                         key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, currentIndexKey);
+                         try
+                         {
+                             SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
+                             AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                             if (compactedRow.isEmpty())
+                             {
+                                 emptyRows++;
+                             }
+                             else
+                             {
+                                 if (prevRow != null && 
acrComparator.compare(prevRow, compactedRow) > 0)
+                                 {
+                                     outOfOrderRows.add(compactedRow);
+                                     outputHandler.warn(String.format("Out of 
order row detected (%s found after %s)", compactedRow.key, prevRow.key));
+                                     continue;
+                                 }
+                                 writer.append(compactedRow);
+                                 prevRow = compactedRow;
+                                 goodRows++;
+                             }
+                         }
+                         catch (Throwable th2)
+                         {
+                             throwIfFatal(th2);
+                             // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
+                             if (isCommutative)
+                                 throw new IOError(th2);
+ 
+                             outputHandler.warn("Retry failed too. Skipping to 
next row (retry's stacktrace follows)", th2);
+                             writer.resetAndTruncate();
+                             dataFile.seek(nextRowPositionFromIndex);
+                             badRows++;
+                         }
+                     }
+                     else
+                     {
+                         // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
+                         if (isCommutative)
+                             throw new IOError(th);
+ 
+                         outputHandler.warn("Row at " + dataStart + " is 
unreadable; skipping to next");
+                         if (currentIndexKey != null)
+                             dataFile.seek(nextRowPositionFromIndex);
+                         badRows++;
+                     }
+                 }
+                 if ((rowsRead++ % 1000) == 0)
+                     controller.mayThrottle(dataFile.getFilePointer());
+             }
+ 
+             if (writer.getFilePointer() > 0)
+                 newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+         }
+         catch (Exception e)
+         {
+             if (writer != null)
+                 writer.abort();
+             throw FBUtilities.unchecked(e);
+         }
+ 
+         if (!outOfOrderRows.isEmpty())
+         {
+             SSTableWriter inOrderWriter = 
CompactionManager.maybeCreateWriter(cfs, destination, expectedBloomFilterSize, 
null, Collections.singletonList(sstable));
+             for (AbstractCompactedRow row : outOfOrderRows)
+                 inOrderWriter.append(row);
+             newInOrderSstable = 
inOrderWriter.closeAndOpenReader(sstable.maxDataAge);
+             outputHandler.warn(String.format("%d out of order rows found 
while scrubbing %s; Those have been written (in order) to a new sstable (%s)", 
outOfOrderRows.size(), sstable, newInOrderSstable));
+         }
+ 
+         if (newSstable == null)
+         {
+             if (badRows > 0)
+                 outputHandler.warn("No valid rows found while scrubbing " + 
sstable + "; it is marked for deletion now. If you want to attempt manual 
recovery, you can find a copy in the pre-scrub snapshot");
+             else
+                 outputHandler.output("Scrub of " + sstable + " complete; 
looks like all " + emptyRows + " rows were tombstoned");
+         }
+         else
+         {
+             outputHandler.output("Scrub of " + sstable + " complete: " + 
goodRows + " rows in new sstable and " + emptyRows + " empty (tombstoned) rows 
dropped");
+             if (badRows > 0)
+                 outputHandler.warn("Unable to recover " + badRows + " rows 
that were skipped.  You can attempt manual recovery from the pre-scrub 
snapshot.  You can also run nodetool repair to transfer the data from a healthy 
replica, if any");
+         }
+     }
+ 
+     public SSTableReader getNewSSTable()
+     {
+         return newSstable;
+     }
+ 
+     public SSTableReader getNewInOrderSSTable()
+     {
+         return newInOrderSstable;
+     }
+ 
+     private void throwIfFatal(Throwable th)
+     {
+         if (th instanceof Error && !(th instanceof AssertionError || th 
instanceof IOError))
+             throw (Error) th;
+     }
+ 
+     public void close()
+     {
+         FileUtils.closeQuietly(dataFile);
+         FileUtils.closeQuietly(indexFile);
+     }
+ 
+     public CompactionInfo.Holder getScrubInfo()
+     {
+         return scrubInfo;
+     }
+ 
+     private static class ScrubInfo extends CompactionInfo.Holder
+     {
+         private final RandomAccessReader dataFile;
+         private final SSTableReader sstable;
+ 
+         public ScrubInfo(RandomAccessReader dataFile, SSTableReader sstable)
+         {
+             this.dataFile = dataFile;
+             this.sstable = sstable;
+         }
+ 
+         public CompactionInfo getCompactionInfo()
+         {
+             try
+             {
+                 return new CompactionInfo(sstable.metadata,
+                                           OperationType.SCRUB,
+                                           dataFile.getFilePointer(),
+                                           dataFile.length());
+             }
+             catch (Exception e)
+             {
+                 throw new RuntimeException();
+             }
+         }
+     }
+ 
+     private static class ScrubController extends CompactionController
+     {
+         public ScrubController(ColumnFamilyStore cfs)
+         {
 -            super(cfs, Integer.MAX_VALUE, true, new 
IntervalTree<SSTableReader>(Collections.<Interval>emptyList()), false);
++            super(cfs, Integer.MAX_VALUE, 
DataTracker.buildIntervalTree(Collections.<SSTableReader>emptyList()));
+         }
+ 
+         @Override
+         public boolean shouldPurge(DecoratedKey key)
+         {
+             return false;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/dht/Bounds.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/hadoop/BulkRecordWriter.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableLoader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 5edd287,0aaa932..680fd8e
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@@ -138,13 -138,29 +138,28 @@@ public class SSTableReader extends SSTa
          return open(desc, componentsFor(desc), metadata, p);
      }
  
+     public static SSTableReader openNoValidation(Descriptor descriptor, 
Set<Component> components, CFMetaData metadata) throws IOException
+     {
 -        return open(descriptor, components, 
Collections.<DecoratedKey>emptySet(), null, metadata, 
StorageService.getPartitioner(), false);
++        return open(descriptor, components, null, metadata, 
StorageService.getPartitioner(), false);
+     }
+ 
      public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, CFMetaData metadata, IPartitioner partitioner) throws IOException
      {
 -        return open(descriptor, components, 
Collections.<DecoratedKey>emptySet(), null, metadata, partitioner);
 +        return open(descriptor, components, null, metadata, partitioner);
      }
  
 -    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, Set<DecoratedKey> savedKeys, DataTracker tracker, CFMetaData 
metadata, IPartitioner partitioner) throws IOException
 +    public static SSTableReader open(Descriptor descriptor, Set<Component> 
components, DataTracker tracker, CFMetaData metadata, IPartitioner partitioner) 
throws IOException
      {
 -        return open(descriptor, components, savedKeys, tracker, metadata, 
partitioner, true);
++        return open(descriptor, components, tracker, metadata, partitioner, 
true);
+     }
+ 
+     private static SSTableReader open(Descriptor descriptor,
+                                       Set<Component> components,
 -                                      Set<DecoratedKey> savedKeys,
+                                       DataTracker tracker,
+                                       CFMetaData metadata,
+                                       IPartitioner partitioner,
+                                       boolean validate) throws IOException
+     {
          assert partitioner != null;
          // Minimum components without which we can't do anything
          assert components.contains(Component.DATA);
@@@ -184,9 -200,13 +199,13 @@@
          }
          else
          {
 -            sstable.load(false, savedKeys);
 +            sstable.load(false);
              sstable.loadBloomFilter();
          }
+ 
+         if (validate)
+             sstable.validate();
+ 
          if (logger.isDebugEnabled())
              logger.debug("INDEX LOAD TIME for " + descriptor + ": " + 
(System.currentTimeMillis() - start) + " ms.");
  
@@@ -389,75 -433,16 +408,81 @@@
          // finalize the state of the reader
          ifile = 
ibuilder.complete(descriptor.filenameFor(Component.PRIMARY_INDEX));
          dfile = dbuilder.complete(descriptor.filenameFor(Component.DATA));
 +        if (readIndex) // save summary information to disk
 +            saveSummary(this, ibuilder, dbuilder);
 +    }
 +
 +    public static boolean loadSummary(SSTableReader reader, 
SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new 
File(reader.descriptor.filenameFor(Component.SUMMARY));
 +        if (!summariesFile.exists())
 +            return false;
 +
 +        DataInputStream iStream = null;
 +        try
 +        {
 +            iStream = new DataInputStream(new FileInputStream(summariesFile));
 +            reader.indexSummary = 
IndexSummary.serializer.deserialize(iStream, reader.partitioner);
 +            reader.first = decodeKey(reader.partitioner, reader.descriptor, 
ByteBufferUtil.readWithLength(iStream));
 +            reader.last = decodeKey(reader.partitioner, reader.descriptor, 
ByteBufferUtil.readWithLength(iStream));
 +            ibuilder.deserializeBounds(iStream);
 +            dbuilder.deserializeBounds(iStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot deserialize SSTable Summary: ", e);
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                summariesFile.delete();
 +
 +            return false;
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(iStream);
 +        }
 +
 +        return true;
 +    }
 +
 +    public static void saveSummary(SSTableReader reader, 
SegmentedFile.Builder ibuilder, SegmentedFile.Builder dbuilder)
 +    {
 +        File summariesFile = new 
File(reader.descriptor.filenameFor(Component.SUMMARY));
 +        if (summariesFile.exists())
 +            summariesFile.delete();
 +
 +        DataOutputStream oStream = null;
 +        try
 +        {
 +            oStream = new DataOutputStream(new 
FileOutputStream(summariesFile));
 +            IndexSummary.serializer.serialize(reader.indexSummary, oStream);
 +            ByteBufferUtil.writeWithLength(reader.first.key, oStream);
 +            ByteBufferUtil.writeWithLength(reader.last.key, oStream);
 +            ibuilder.serializeBounds(oStream);
 +            dbuilder.serializeBounds(oStream);
 +        }
 +        catch (IOException e)
 +        {
 +            logger.debug("Cannot save SSTable Summary: ", e);
 +
 +            // corrupted hence delete it and let it load it now.
 +            if (summariesFile.exists())
 +                summariesFile.delete();
 +        }
 +        finally
 +        {
 +            FileUtils.closeQuietly(oStream);
 +        }
      }
  
+     private void validate()
+     {
+         if (this.first.compareTo(this.last) > 0)
+             throw new IllegalStateException(String.format("SSTable first key 
%s > last key %s", this.first, this.last));
+     }
+ 
      /** get the position in the index file to start scanning to find the 
given key (at most indexInterval keys away) */
 -    private long getIndexScanPosition(RowPosition key)
 +    public long getIndexScanPosition(RowPosition key)
      {
          assert indexSummary.getKeys() != null && 
indexSummary.getKeys().size() > 0;
          int index = Collections.binarySearch(indexSummary.getKeys(), key);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
index 2fc9771,5619951..05f5b25
--- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
@@@ -130,11 -127,11 +130,11 @@@ public class SSTableWriter extends SSTa
      /**
       * Perform sanity checks on @param decoratedKey and @return the position 
in the data file before any data is written
       */
 -    private long beforeAppend(DecoratedKey<?> decoratedKey) throws IOException
 +    private long beforeAppend(DecoratedKey decoratedKey) throws IOException
      {
          assert decoratedKey != null : "Keys must not be null";
-         assert lastWrittenKey == null || 
lastWrittenKey.compareTo(decoratedKey) < 0
-                : "Last written key " + lastWrittenKey + " >= current key " + 
decoratedKey + " writing into " + getFilename();
+         if (lastWrittenKey != null && lastWrittenKey.compareTo(decoratedKey) 
>= 0)
+             throw new RuntimeException("Last written key " + lastWrittenKey + 
" >= current key " + decoratedKey + " writing into " + getFilename());
          return (lastWrittenKey == null) ? 0 : dataFile.getFilePointer();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/src/java/org/apache/cassandra/tools/BulkLoader.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/BulkLoader.java
index 8465fb6,ace37db..24b2423
--- a/src/java/org/apache/cassandra/tools/BulkLoader.java
+++ b/src/java/org/apache/cassandra/tools/BulkLoader.java
@@@ -173,11 -176,11 +175,11 @@@ public class BulkLoade
      static class ExternalClient extends SSTableLoader.Client
      {
          private final Map<String, Set<String>> knownCfs = new HashMap<String, 
Set<String>>();
-         private final SSTableLoader.OutputHandler outputHandler;
+         private final OutputHandler outputHandler;
 -        private Set<InetAddress> hosts = new HashSet<InetAddress>();
 -        private int rpcPort;
 +        private final Set<InetAddress> hosts;
 +        private final int rpcPort;
  
-         public ExternalClient(SSTableLoader.OutputHandler outputHandler, 
Set<InetAddress> hosts, int port)
+         public ExternalClient(OutputHandler outputHandler, Set<InetAddress> 
hosts, int port)
          {
              super();
              this.outputHandler = outputHandler;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3eff5455/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------

Reply via email to