Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/897ffe87
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/897ffe87
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/897ffe87

Branch: refs/heads/cassandra-3.5
Commit: 897ffe87e41ab128c9e8969d535cb2a706baf563
Parents: 6c1ef2b 8b8a3f5
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Mar 29 10:54:45 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Mar 29 10:54:45 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 16 ++++----
 .../db/compaction/CompactionManager.java        | 40 +++++++++++++-------
 .../cassandra/service/StorageService.java       | 24 +++++++++---
 .../cassandra/service/StorageServiceMBean.java  |  6 +++
 .../org/apache/cassandra/tools/NodeProbe.java   | 34 +++++++++++------
 .../cassandra/tools/nodetool/Cleanup.java       |  8 +++-
 .../apache/cassandra/tools/nodetool/Scrub.java  |  7 +++-
 .../tools/nodetool/UpgradeSSTable.java          |  7 +++-
 .../org/apache/cassandra/db/CleanupTest.java    |  6 +--
 .../unit/org/apache/cassandra/db/ScrubTest.java | 20 +++++-----
 .../LeveledCompactionStrategyTest.java          |  2 +-
 12 files changed, 117 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 91dc588,7794d4f..098d062
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,39 -1,4 +1,41 @@@
 -2.1.14
 +2.2.6
 + * cqlsh: COPY FROM should check that explicit column names are valid 
(CASSANDRA-11333)
 + * Add -Dcassandra.start_gossip startup option (CASSANDRA-10809)
 + * Fix UTF8Validator.validate() for modified UTF-8 (CASSANDRA-10748)
 + * Clarify that now() function is calculated on the coordinator node in CQL 
documentation (CASSANDRA-10900)
 + * Fix bloom filter sizing with LCS (CASSANDRA-11344)
 + * (cqlsh) Fix error when result is 0 rows with EXPAND ON (CASSANDRA-11092)
 + * Fix intra-node serialization issue for multicolumn-restrictions 
(CASSANDRA-11196)
 + * Non-obsoleting compaction operations over compressed files can impose rate 
limit on normal reads (CASSANDRA-11301)
 + * Add missing newline at end of bin/cqlsh (CASSANDRA-11325)
 + * Fix AE in nodetool cfstats (backport CASSANDRA-10859) (CASSANDRA-11297)
 + * Unresolved hostname leads to replace being ignored (CASSANDRA-11210)
 + * Fix filtering on non-primary key columns for thrift static column families
 +   (CASSANDRA-6377)
 + * Only log yaml config once, at startup (CASSANDRA-11217)
 + * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 + * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
 + * Range.compareTo() violates the contract of Comparable (CASSANDRA-11216)
 + * Avoid NPE when serializing ErrorMessage with null message (CASSANDRA-11167)
 + * Replacing an aggregate with a new version doesn't reset INITCOND 
(CASSANDRA-10840)
 + * (cqlsh) cqlsh cannot be called through symlink (CASSANDRA-11037)
 + * fix ohc and java-driver pom dependencies in build.xml (CASSANDRA-10793)
 + * Protect from keyspace dropped during repair (CASSANDRA-11065)
 + * Handle adding fields to a UDT in SELECT JSON and toJson() (CASSANDRA-11146)
 + * Better error message for cleanup (CASSANDRA-10991)
 + * cqlsh pg-style-strings broken if line ends with ';' (CASSANDRA-11123)
 + * Use cloned TokenMetadata in size estimates to avoid race against 
membership check
 +   (CASSANDRA-10736)
 + * Always persist upsampled index summaries (CASSANDRA-10512)
 + * (cqlsh) Fix inconsistent auto-complete (CASSANDRA-10733)
 + * Make SELECT JSON and toJson() threadsafe (CASSANDRA-11048)
 + * Fix SELECT on tuple relations for mixed ASC/DESC clustering order 
(CASSANDRA-7281)
 + * (cqlsh) Support utf-8/cp65001 encoding on Windows (CASSANDRA-11030)
 + * Fix paging on DISTINCT queries repeats result when first row in partition 
changes
 +   (CASSANDRA-10010)
 +Merged from 2.1:
++ * Add a -j parameter to scrub/cleanup/upgradesstables to state how
++   many threads to use (CASSANDRA-11179)
   * Backport CASSANDRA-10679 (CASSANDRA-9598)
   * Don't do defragmentation if reading from repaired sstables 
(CASSANDRA-10342)
   * Fix streaming_socket_timeout_in_ms not enforced (CASSANDRA-11286)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index da4a84a,3d66d3a..09f58ac
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1499,74 -1478,22 +1499,74 @@@ public class ColumnFamilyStore implemen
          return maxFile;
      }
  
-     public CompactionManager.AllSSTableOpStatus forceCleanup() throws 
ExecutionException, InterruptedException
+     public CompactionManager.AllSSTableOpStatus forceCleanup(int jobs) throws 
ExecutionException, InterruptedException
      {
-         return 
CompactionManager.instance.performCleanup(ColumnFamilyStore.this);
+         return 
CompactionManager.instance.performCleanup(ColumnFamilyStore.this, jobs);
      }
  
-     public CompactionManager.AllSSTableOpStatus scrub(boolean 
disableSnapshot, boolean skipCorrupted, boolean checkData) throws 
ExecutionException, InterruptedException
+     public CompactionManager.AllSSTableOpStatus scrub(boolean 
disableSnapshot, boolean skipCorrupted, boolean checkData, int jobs) throws 
ExecutionException, InterruptedException
      {
-         return scrub(disableSnapshot, skipCorrupted, false, checkData);
++        return scrub(disableSnapshot, skipCorrupted, false, checkData, jobs);
 +    }
 +
 +    @VisibleForTesting
-     public CompactionManager.AllSSTableOpStatus scrub(boolean 
disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData) 
throws ExecutionException, InterruptedException
++    public CompactionManager.AllSSTableOpStatus scrub(boolean 
disableSnapshot, boolean skipCorrupted, boolean alwaysFail, boolean checkData, 
int jobs) throws ExecutionException, InterruptedException
 +    {
          // skip snapshot creation during scrub, SEE JIRA 5891
          if(!disableSnapshot)
              snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis());
 -        return 
CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, 
checkData, jobs);
 +
 +        try
 +        {
-             return 
CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, 
checkData);
++            return 
CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted, 
checkData, jobs);
 +        }
 +        catch(Throwable t)
 +        {
 +            if (!rebuildOnFailedScrub(t))
 +                throw t;
 +
 +            return alwaysFail ? CompactionManager.AllSSTableOpStatus.ABORTED 
: CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 +        }
 +    }
 +
 +    /**
 +     * CASSANDRA-5174 : For an index cfs we may be able to discard everything 
and just rebuild
 +     * the index when a scrub fails.
 +     *
 +     * @return true if we are an index cfs and we successfully rebuilt the 
index
 +     */
 +    public boolean rebuildOnFailedScrub(Throwable failure)
 +    {
 +        if (!isIndex())
 +            return false;
 +
 +        SecondaryIndex index = null;
 +        if 
(metadata.cfName.contains(Directories.SECONDARY_INDEX_NAME_SEPARATOR))
 +        {
 +            String[] parts = metadata.cfName.split("\\" + 
Directories.SECONDARY_INDEX_NAME_SEPARATOR, 2);
 +            ColumnFamilyStore parentCfs = 
keyspace.getColumnFamilyStore(parts[0]);
 +            index = parentCfs.indexManager.getIndexByName(metadata.cfName);
 +            assert index != null;
 +        }
 +
 +        if (index == null)
 +            return false;
 +
 +        truncateBlocking();
 +
 +        logger.warn("Rebuilding index for {} because of <{}>", name, 
failure.getMessage());
 +        index.getBaseCfs().rebuildSecondaryIndex(index.getIndexName());
 +        return true;
 +    }
 +
 +    public CompactionManager.AllSSTableOpStatus verify(boolean 
extendedVerify) throws ExecutionException, InterruptedException
 +    {
 +        return 
CompactionManager.instance.performVerify(ColumnFamilyStore.this, 
extendedVerify);
      }
  
-     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion) throws ExecutionException, InterruptedException
+     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
      {
-         return 
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, 
excludeCurrentVersion);
+         return 
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, 
excludeCurrentVersion, jobs);
      }
  
      public void markObsolete(Collection<SSTableReader> sstables, 
OperationType compactionType)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index b015bcd,e382cab..ca02747
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -263,21 -271,37 +263,31 @@@ public class CompactionManager implemen
          }
      }
  
+     /**
+      * Run an operation over all sstables using jobs threads
+      *
+      * @param cfs the column family store to run the operation on
+      * @param operation the operation to run
+      * @param jobs the number of threads to use - 0 means use all available. 
It never uses more than concurrent_compactors threads
+      * @return status of the operation
+      * @throws ExecutionException
+      * @throws InterruptedException
+      */
 -    private AllSSTableOpStatus parallelAllSSTableOperation(final 
ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs) throws 
ExecutionException, InterruptedException
 +    @SuppressWarnings("resource")
-     private AllSSTableOpStatus parallelAllSSTableOperation(final 
ColumnFamilyStore cfs, final OneSSTableOperation operation, OperationType 
operationType) throws ExecutionException, InterruptedException
++    private AllSSTableOpStatus parallelAllSSTableOperation(final 
ColumnFamilyStore cfs, final OneSSTableOperation operation, int jobs, 
OperationType operationType) throws ExecutionException, InterruptedException
      {
 -        Iterable<SSTableReader> compactingSSTables = cfs.markAllCompacting();
 -        if (compactingSSTables == null)
 +        List<LifecycleTransaction> transactions = new ArrayList<>();
 +        try (LifecycleTransaction compacting = 
cfs.markAllCompacting(operationType))
          {
 -            logger.info("Aborting operation on {}.{} after failing to 
interrupt other compaction operations", cfs.keyspace.getName(), cfs.name);
 -            return AllSSTableOpStatus.ABORTED;
 -        }
 -        if (Iterables.isEmpty(compactingSSTables))
 -        {
 -            logger.info("No sstables for {}.{}", cfs.keyspace.getName(), 
cfs.name);
 -            return AllSSTableOpStatus.SUCCESSFUL;
 -        }
 -        Set<SSTableReader> sstables = 
Sets.newHashSet(operation.filterSSTables(compactingSSTables));
 -        Set<SSTableReader> filteredAway = 
Sets.difference(Sets.newHashSet(compactingSSTables), sstables);
 -        cfs.getDataTracker().unmarkCompacting(filteredAway);
 -        final Set<SSTableReader> finished = Sets.newConcurrentHashSet();
 +            Iterable<SSTableReader> sstables = compacting != null ? 
Lists.newArrayList(operation.filterSSTables(compacting)) : 
Collections.<SSTableReader>emptyList();
 +            if (Iterables.isEmpty(sstables))
 +            {
 +                logger.info("No sstables for {}.{}", cfs.keyspace.getName(), 
cfs.name);
 +                return AllSSTableOpStatus.SUCCESSFUL;
 +            }
 +
 +            List<Future<Object>> futures = new ArrayList<>();
  
 -        List<Future<Object>> futures = new ArrayList<>();
 -        try
 -        {
              for (final SSTableReader sstable : sstables)
              {
                  if (executor.isShutdown())
@@@ -286,23 -310,31 +296,27 @@@
                      return AllSSTableOpStatus.ABORTED;
                  }
  
 +                final LifecycleTransaction txn = 
compacting.split(singleton(sstable));
 +                transactions.add(txn);
-                 futures.add(executor.submit(new Callable<Object>()
+                 Callable<Object> callable = new Callable<Object>()
                  {
                      @Override
                      public Object call() throws Exception
                      {
 -                        try
 -                        {
 -                            operation.execute(sstable);
 -                        }
 -                        finally
 -                        {
 -                            
cfs.getDataTracker().unmarkCompacting(Collections.singleton(sstable));
 -                            finished.add(sstable);
 -                        }
 +                        operation.execute(txn);
                          return this;
                      }
-                 }));
+                 };
+                 futures.add(executor.submit(callable));
+                 if (jobs > 0 && futures.size() == jobs)
+                 {
+                     FBUtilities.waitOnFutures(futures);
+                     futures.clear();
+                 }
              }
- 
-             assert compacting.originals().isEmpty();
- 
              FBUtilities.waitOnFutures(futures);
++            assert compacting.originals().isEmpty();
 +            return AllSSTableOpStatus.SUCCESSFUL;
          }
          finally
          {
@@@ -327,9 -358,9 +341,9 @@@
          }
      }
  
-     public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData)
 -    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData, int jobs) throws 
InterruptedException, ExecutionException
++    public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final 
boolean skipCorrupted, final boolean checkData, int jobs)
 +    throws InterruptedException, ExecutionException
      {
 -        assert !cfs.isIndex();
          return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
          {
              @Override
@@@ -343,29 -374,10 +357,29 @@@
              {
                  scrubOne(cfs, input, skipCorrupted, checkData);
              }
-         }, OperationType.SCRUB);
 -        }, jobs);
++        }, jobs, OperationType.SCRUB);
 +    }
 +
 +    public AllSSTableOpStatus performVerify(final ColumnFamilyStore cfs, 
final boolean extendedVerify) throws InterruptedException, ExecutionException
 +    {
 +        assert !cfs.isIndex();
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction input)
 +            {
 +                return input.originals();
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction input) throws IOException
 +            {
 +                verifyOne(cfs, input.onlyOne(), extendedVerify);
 +            }
-         }, OperationType.VERIFY);
++        }, 0, OperationType.VERIFY);
      }
  
-     public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore 
cfs, final boolean excludeCurrentVersion) throws InterruptedException, 
ExecutionException
+     public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore 
cfs, final boolean excludeCurrentVersion, int jobs) throws 
InterruptedException, ExecutionException
      {
          return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
          {
@@@ -394,10 -402,10 +408,10 @@@
                  task.setCompactionType(OperationType.UPGRADE_SSTABLES);
                  task.execute(metrics);
              }
-         }, OperationType.UPGRADE_SSTABLES);
 -        }, jobs);
++        }, jobs, OperationType.UPGRADE_SSTABLES);
      }
  
-     public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) 
throws InterruptedException, ExecutionException
+     public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, 
int jobs) throws InterruptedException, ExecutionException
      {
          assert !cfStore.isIndex();
          Keyspace keyspace = cfStore.keyspace;
@@@ -425,12 -428,12 +439,12 @@@
              }
  
              @Override
 -            public void execute(SSTableReader input) throws IOException
 +            public void execute(LifecycleTransaction txn) throws IOException
              {
                  CleanupStrategy cleanupStrategy = 
CleanupStrategy.get(cfStore, ranges);
 -                doCleanupOne(cfStore, input, cleanupStrategy, ranges, 
hasIndexes);
 +                doCleanupOne(cfStore, txn, cleanupStrategy, ranges, 
hasIndexes);
              }
-         }, OperationType.CLEANUP);
 -        }, jobs);
++        }, jobs, OperationType.CLEANUP);
      }
  
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore 
cfs,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 5d29a5a,507aedb..bca5996
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2555,7 -2385,12 +2555,12 @@@ public class StorageService extends Not
  
      public int forceKeyspaceCleanup(String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
+         return forceKeyspaceCleanup(0, keyspaceName, columnFamilies);
+     }
+ 
+     public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
+     {
 -        if (keyspaceName.equals(Keyspace.SYSTEM_KS))
 +        if (keyspaceName.equals(SystemKeyspace.NAME))
              throw new RuntimeException("Cleanup of the system keyspace is 
neither necessary nor wise");
  
          CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
@@@ -2575,30 -2410,27 +2580,39 @@@
  
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, String keyspaceName, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
      {
+         return scrub(disableSnapshot, skipCorrupted, checkData, 0, 
keyspaceName, columnFamilies);
+     }
+ 
+     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, int jobs, String keyspaceName, String... columnFamilies) throws 
IOException, ExecutionException, InterruptedException
+     {
          CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 -        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
 +        for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, false, 
keyspaceName, columnFamilies))
          {
-             CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.scrub(disableSnapshot, skipCorrupted, checkData);
+             CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.scrub(disableSnapshot, skipCorrupted, checkData, jobs);
              if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
                  status = oneStatus;
          }
          return status.statusCode;
      }
- 
 +    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    {
 +        CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 +        for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
 +        {
 +            CompactionManager.AllSSTableOpStatus oneStatus = 
cfStore.verify(extendedVerify);
 +            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
 +                status = oneStatus;
 +        }
 +        return status.statusCode;
 +    }
 +
      public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
      {
+         return upgradeSSTables(keyspaceName, excludeCurrentVersion, 2, 
columnFamilies);
+     }
+ 
+     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
+     {
          CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
          for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, 
keyspaceName, columnFamilies))
          {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 7c5cd0a,d3a1725..761eed6
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -261,17 -272,11 +263,19 @@@ public interface StorageServiceMBean ex
       */
      @Deprecated
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, String 
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException;
+     @Deprecated
      public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, String keyspaceName, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException;
+     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, int jobs, String keyspaceName, String... columnFamilies) throws 
IOException, ExecutionException, InterruptedException;
  
      /**
 +     * Verify (checksums of) the given keyspace.
 +     * If columnFamilies array is empty, all CFs are verified.
 +     *
 +     * The entire sstable will be read to ensure each cell validates if 
extendedVerify is true
 +     */
 +    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException;
 +
 +    /**
       * Rewrite all sstables to the latest version.
       * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables 
first.
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 93804a8,ab08e9f..2f27cea
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -53,6 -62,8 +53,7 @@@ import javax.management.remote.JMXConne
  import javax.management.remote.JMXServiceURL;
  import javax.rmi.ssl.SslRMIClientSocketFactory;
  
 -import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean;
+ import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.ColumnFamilyStoreMBean;
  import org.apache.cassandra.db.HintedHandOffManager;
  import org.apache.cassandra.db.HintedHandOffManagerMBean;
@@@ -230,60 -238,53 +231,69 @@@ public class NodeProbe implements AutoC
          jmxc.close();
      }
  
-     public int forceKeyspaceCleanup(String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
+     public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
-         return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies);
+         return ssProxy.forceKeyspaceCleanup(jobs, keyspaceName, 
columnFamilies);
      }
  
-     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, String keyspaceName, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
+     public int scrub(boolean disableSnapshot, boolean skipCorrupted, boolean 
checkData, int jobs, String keyspaceName, String... columnFamilies) throws 
IOException, ExecutionException, InterruptedException
      {
-         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, 
keyspaceName, columnFamilies);
+         return ssProxy.scrub(disableSnapshot, skipCorrupted, checkData, jobs, 
keyspaceName, columnFamilies);
      }
  
 +    public int verify(boolean extendedVerify, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    {
 +        return ssProxy.verify(extendedVerify, keyspaceName, columnFamilies);
 +    }
 +
-     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
+     public int upgradeSSTables(String keyspaceName, boolean 
excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
      {
-         return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, 
columnFamilies);
+         return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, 
jobs, columnFamilies);
      }
  
-     public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, 
String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException
+     private void checkJobs(PrintStream out, int jobs)
      {
-         if (forceKeyspaceCleanup(keyspaceName, columnFamilies) != 0)
+         if (jobs > DatabaseDescriptor.getConcurrentCompactors())
+             out.println(String.format("jobs (%d) is bigger than configured 
concurrent_compactors (%d), using at most %d threads", jobs, 
DatabaseDescriptor.getConcurrentCompactors(), 
DatabaseDescriptor.getConcurrentCompactors()));
+     }
+ 
+     public void forceKeyspaceCleanup(PrintStream out, int jobs, String 
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException
+     {
++        checkJobs(out, jobs);
+         if (forceKeyspaceCleanup(jobs, keyspaceName, columnFamilies) != 0)
          {
              failed = true;
 -            out.println("Aborted cleaning up atleast one column family in 
keyspace "+keyspaceName+", check server logs for more information.");
 +            out.println("Aborted cleaning up at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
          }
      }
  
-     public void scrub(PrintStream out, boolean disableSnapshot, boolean 
skipCorrupted, boolean checkData, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
+     public void scrub(PrintStream out, boolean disableSnapshot, boolean 
skipCorrupted, boolean checkData, int jobs, String keyspaceName, String... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
      {
-         if (scrub(disableSnapshot, skipCorrupted, checkData, keyspaceName, 
columnFamilies) != 0)
+         checkJobs(out, jobs);
+         if (scrub(disableSnapshot, skipCorrupted, checkData, jobs, 
keyspaceName, columnFamilies) != 0)
          {
              failed = true;
 -            out.println("Aborted scrubbing atleast one column family in 
keyspace "+keyspaceName+", check server logs for more information.");
 +            out.println("Aborted scrubbing at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
 +        }
 +    }
 +
 +    public void verify(PrintStream out, boolean extendedVerify, String 
keyspaceName, String... columnFamilies) throws IOException, ExecutionException, 
InterruptedException
 +    {
 +        if (verify(extendedVerify, keyspaceName, columnFamilies) != 0)
 +        {
 +            failed = true;
 +            out.println("Aborted verifying at least one table in keyspace 
"+keyspaceName+", check server logs for more information.");
          }
      }
  
 +
-     public void upgradeSSTables(PrintStream out, String keyspaceName, boolean 
excludeCurrentVersion, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
+     public void upgradeSSTables(PrintStream out, String keyspaceName, boolean 
excludeCurrentVersion, int jobs, String... columnFamilies) throws IOException, 
ExecutionException, InterruptedException
      {
-         if (upgradeSSTables(keyspaceName, excludeCurrentVersion, 
columnFamilies) != 0)
+         checkJobs(out, jobs);
+         if (upgradeSSTables(keyspaceName, excludeCurrentVersion, jobs, 
columnFamilies) != 0)
          {
              failed = true;
 -            out.println("Aborted upgrading sstables for atleast one column 
family in keyspace "+keyspaceName+", check server logs for more information.");
 +            out.println("Aborted upgrading sstables for atleast one table in 
keyspace "+keyspaceName+", check server logs for more information.");
          }
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
index aa415b3,0000000..6c6676d
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Cleanup.java
@@@ -1,56 -1,0 +1,62 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools.nodetool;
 +
 +import io.airlift.command.Arguments;
 +import io.airlift.command.Command;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
++import io.airlift.command.Option;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.tools.NodeProbe;
 +import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 +
 +@Command(name = "cleanup", description = "Triggers the immediate cleanup of 
keys no longer belonging to a node. By default, clean all keyspaces")
 +public class Cleanup extends NodeToolCmd
 +{
 +    @Arguments(usage = "[<keyspace> <tables>...]", description = "The 
keyspace followed by one or many tables")
 +    private List<String> args = new ArrayList<>();
 +
++    @Option(title = "jobs",
++            name = {"-j", "--jobs"},
++            description = "Number of sstables to cleanup simultanously, set 
to 0 to use all available compaction threads")
++    private int jobs = 2;
++
 +    @Override
 +    public void execute(NodeProbe probe)
 +    {
 +        List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +        String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +        for (String keyspace : keyspaces)
 +        {
 +            if (SystemKeyspace.NAME.equals(keyspace))
 +                continue;
 +
 +            try
 +            {
-                 probe.forceKeyspaceCleanup(System.out, keyspace, cfnames);
++                probe.forceKeyspaceCleanup(System.out, jobs, keyspace, 
cfnames);
 +            } catch (Exception e)
 +            {
 +                throw new RuntimeException("Error occurred during cleanup", 
e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/Scrub.java
index 54f981e,0000000..dafe8d1
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/Scrub.java
@@@ -1,71 -1,0 +1,76 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools.nodetool;
 +
 +import io.airlift.command.Arguments;
 +import io.airlift.command.Command;
 +import io.airlift.command.Option;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.apache.cassandra.tools.NodeProbe;
 +import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 +
 +@Command(name = "scrub", description = "Scrub (rebuild sstables for) one or 
more tables")
 +public class Scrub extends NodeToolCmd
 +{
 +    @Arguments(usage = "[<keyspace> <tables>...]", description = "The 
keyspace followed by one or many tables")
 +    private List<String> args = new ArrayList<>();
 +
 +    @Option(title = "disable_snapshot",
 +            name = {"-ns", "--no-snapshot"},
 +            description = "Scrubbed CFs will be snapshotted first, if 
disableSnapshot is false. (default false)")
 +    private boolean disableSnapshot = false;
 +
 +    @Option(title = "skip_corrupted",
 +            name = {"-s", "--skip-corrupted"},
 +            description = "Skip corrupted partitions even when scrubbing 
counter tables. (default false)")
 +    private boolean skipCorrupted = false;
 +
 +    @Option(title = "no_validate",
 +                   name = {"-n", "--no-validate"},
 +                   description = "Do not validate columns using column 
validator")
 +    private boolean noValidation = false;
 +
++    @Option(title = "jobs",
++            name = {"-j", "--jobs"},
++            description = "Number of sstables to scrub simultanously, set to 
0 to use all available compaction threads")
++    private int jobs = 2;
++
 +    @Override
 +    public void execute(NodeProbe probe)
 +    {
 +        List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +        String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +        for (String keyspace : keyspaces)
 +        {
 +            try
 +            {
-                 probe.scrub(System.out, disableSnapshot, skipCorrupted, 
!noValidation, keyspace, cfnames);
++                probe.scrub(System.out, disableSnapshot, skipCorrupted, 
!noValidation, jobs, keyspace, cfnames);
 +            } catch (IllegalArgumentException e)
 +            {
 +                throw e;
 +            } catch (Exception e)
 +            {
 +                throw new RuntimeException("Error occurred during scrubbing", 
e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
index 86a2cd5,0000000..596f353
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/UpgradeSSTable.java
@@@ -1,56 -1,0 +1,61 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools.nodetool;
 +
 +import io.airlift.command.Arguments;
 +import io.airlift.command.Command;
 +import io.airlift.command.Option;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import org.apache.cassandra.tools.NodeProbe;
 +import org.apache.cassandra.tools.NodeTool.NodeToolCmd;
 +
 +@Command(name = "upgradesstables", description = "Rewrite sstables (for the 
requested tables) that are not on the current version (thus upgrading them to 
said current version)")
 +public class UpgradeSSTable extends NodeToolCmd
 +{
 +    @Arguments(usage = "[<keyspace> <tables>...]", description = "The 
keyspace followed by one or many tables")
 +    private List<String> args = new ArrayList<>();
 +
 +    @Option(title = "include_all", name = {"-a", "--include-all-sstables"}, 
description = "Use -a to include all sstables, even those already on the 
current version")
 +    private boolean includeAll = false;
 +
++    @Option(title = "jobs",
++            name = {"-j", "--jobs"},
++            description = "Number of sstables to upgrade simultanously, set 
to 0 to use all available compaction threads")
++    private int jobs = 2;
++
 +    @Override
 +    public void execute(NodeProbe probe)
 +    {
 +        List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +        String[] cfnames = parseOptionalColumnFamilies(args);
 +
 +        for (String keyspace : keyspaces)
 +        {
 +            try
 +            {
-                 probe.upgradeSSTables(System.out, keyspace, !includeAll, 
cfnames);
++                probe.upgradeSSTables(System.out, keyspace, !includeAll, 
jobs, cfnames);
 +            } catch (Exception e)
 +            {
 +                throw new RuntimeException("Error occurred during enabling 
auto-compaction", e);
 +            }
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/db/ScrubTest.java
index c0cde40,4efd082..4cca7ff
--- a/test/unit/org/apache/cassandra/db/ScrubTest.java
+++ b/test/unit/org/apache/cassandra/db/ScrubTest.java
@@@ -588,14 -499,14 +588,14 @@@ public class ScrubTes
      @Test
      public void testScrubColumnValidation() throws InterruptedException, 
RequestExecutionException, ExecutionException
      {
 -        QueryProcessor.process("CREATE TABLE 
\"Keyspace1\".test_compact_static_columns (a bigint, b timeuuid, c boolean 
static, d text, PRIMARY KEY (a, b))", ConsistencyLevel.ONE);
 +        QueryProcessor.process(String.format("CREATE TABLE 
\"%s\".test_compact_static_columns (a bigint, b timeuuid, c boolean static, d 
text, PRIMARY KEY (a, b))", KEYSPACE), ConsistencyLevel.ONE);
  
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
          ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore("test_compact_static_columns");
  
 -        QueryProcessor.executeInternal("INSERT INTO 
\"Keyspace1\".test_compact_static_columns (a, b, c, d) VALUES (123, 
c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')");
 +        QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".test_compact_static_columns (a, b, c, d) VALUES (123, 
c3db07e8-b602-11e3-bc6b-e0b9a54a6d93, true, 'foobar')", KEYSPACE));
          cfs.forceBlockingFlush();
-         CompactionManager.instance.performScrub(cfs, false, true);
+         CompactionManager.instance.performScrub(cfs, false, true, 2);
  
          QueryProcessor.process("CREATE TABLE 
\"Keyspace1\".test_scrub_validation (a text primary key, b int)", 
ConsistencyLevel.ONE);
          ColumnFamilyStore cfs2 = 
keyspace.getColumnFamilyStore("test_scrub_validation");
@@@ -614,15 -525,15 +614,15 @@@
      @Test
      public void testColumnNameEqualToDefaultKeyAlias() throws 
ExecutionException, InterruptedException
      {
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 -        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("UUIDKeys");
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_UUID);
  
 -        ColumnFamily cf = 
ArrayBackedSortedColumns.factory.create("Keyspace1", "UUIDKeys");
 +        ColumnFamily cf = ArrayBackedSortedColumns.factory.create(KEYSPACE, 
CF_UUID);
          cf.addColumn(column(CFMetaData.DEFAULT_KEY_ALIAS, "not a uuid", 1L));
 -        Mutation mutation = new Mutation("Keyspace1", 
ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf);
 +        Mutation mutation = new Mutation(KEYSPACE, 
ByteBufferUtil.bytes(UUIDGen.getTimeUUID()), cf);
          mutation.applyUnsafe();
          cfs.forceBlockingFlush();
-         CompactionManager.instance.performScrub(cfs, false, true);
+         CompactionManager.instance.performScrub(cfs, false, true, 2);
  
          assertEquals(1, cfs.getSSTables().size());
      }
@@@ -634,19 -545,19 +634,19 @@@
      @Test
      public void testValidationCompactStorage() throws Exception
      {
 -        QueryProcessor.process("CREATE TABLE 
\"Keyspace1\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY 
(a, b)) WITH COMPACT STORAGE", ConsistencyLevel.ONE);
 +        QueryProcessor.process(String.format("CREATE TABLE 
\"%s\".test_compact_dynamic_columns (a int, b text, c text, PRIMARY KEY (a, b)) 
WITH COMPACT STORAGE", KEYSPACE), ConsistencyLevel.ONE);
  
 -        Keyspace keyspace = Keyspace.open("Keyspace1");
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
          ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore("test_compact_dynamic_columns");
  
 -        QueryProcessor.executeInternal("INSERT INTO 
\"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')");
 -        QueryProcessor.executeInternal("INSERT INTO 
\"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')");
 -        QueryProcessor.executeInternal("INSERT INTO 
\"Keyspace1\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')");
 +        QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'a', 'foo')", 
KEYSPACE));
 +        QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'b', 'bar')", 
KEYSPACE));
 +        QueryProcessor.executeInternal(String.format("INSERT INTO 
\"%s\".test_compact_dynamic_columns (a, b, c) VALUES (0, 'c', 'boo')", 
KEYSPACE));
          cfs.forceBlockingFlush();
-         CompactionManager.instance.performScrub(cfs, true, true);
+         CompactionManager.instance.performScrub(cfs, true, true, 2);
  
          // Scrub is silent, but it will remove broken records. So reading 
everything back to make sure nothing to "scrubbed away"
 -        UntypedResultSet rs = QueryProcessor.executeInternal("SELECT * FROM 
\"Keyspace1\".test_compact_dynamic_columns");
 +        UntypedResultSet rs = 
QueryProcessor.executeInternal(String.format("SELECT * FROM 
\"%s\".test_compact_dynamic_columns", KEYSPACE));
          assertEquals(3, rs.size());
  
          Iterator<UntypedResultSet.Row> iter = rs.iterator();
@@@ -654,129 -565,4 +654,129 @@@
          assertEquals("bar", iter.next().getString("c"));
          assertEquals("boo", iter.next().getString("c"));
      }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex_preserveOrder() throws IOException, 
ExecutionException, InterruptedException
 +    {
 +        //If the partitioner preserves the order then SecondaryIndex uses 
BytesType comparator,
 +        // otherwise it uses LocalByPartitionerType
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex_preserveOrder() throws IOException, 
ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(BytesType.instance);
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubKeysIndex() throws IOException, ExecutionException, 
InterruptedException
 +    {
 +        setKeyComparator(new 
LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubCompositeIndex() throws IOException, 
ExecutionException, InterruptedException
 +    {
 +        setKeyComparator(new 
LocalByPartionerType(StorageService.getPartitioner()));
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, true);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubKeysIndex() throws IOException, 
ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testFailScrubCompositeIndex() throws IOException, 
ExecutionException, InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX2, COL_COMPOSITES_INDEX, true, false);
 +    }
 +
 +    @Test /* CASSANDRA-5174 */
 +    public void testScrubTwice() throws IOException, ExecutionException, 
InterruptedException
 +    {
 +        testScrubIndex(CF_INDEX1, COL_KEYS_INDEX, false, true, true);
 +    }
 +
 +    /** The SecondaryIndex class is used for custom indexes so to avoid
 +     * making a public final field into a private field with getters
 +     * and setters, we resort to this hack in order to test it properly
 +     * since it can have two values which influence the scrubbing behavior.
 +     * @param comparator - the key comparator we want to test
 +     */
 +    private void setKeyComparator(AbstractType<?> comparator)
 +    {
 +        try
 +        {
 +            Field keyComparator = 
SecondaryIndex.class.getDeclaredField("keyComparator");
 +            keyComparator.setAccessible(true);
 +            int modifiers = keyComparator.getModifiers();
 +            Field modifierField = 
keyComparator.getClass().getDeclaredField("modifiers");
 +            modifiers = modifiers & ~Modifier.FINAL;
 +            modifierField.setAccessible(true);
 +            modifierField.setInt(keyComparator, modifiers);
 +
 +            keyComparator.set(null, comparator);
 +        }
 +        catch (Exception ex)
 +        {
 +            fail("Failed to change key comparator in secondary index : " + 
ex.getMessage());
 +            ex.printStackTrace();
 +        }
 +    }
 +
 +    private void testScrubIndex(String cfName, String colName, boolean 
composite, boolean ... scrubs)
 +            throws IOException, ExecutionException, InterruptedException
 +    {
 +        CompactionManager.instance.disableAutoCompaction();
 +        Keyspace keyspace = Keyspace.open(KEYSPACE);
 +        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cfName);
 +        cfs.clearUnsafe();
 +
 +        int numRows = 1000;
 +        long[] colValues = new long [numRows * 2]; // each row has two columns
 +        for (int i = 0; i < colValues.length; i+=2)
 +        {
 +            colValues[i] = (i % 4 == 0 ? 1L : 2L); // index column
 +            colValues[i+1] = 3L; //other column
 +        }
 +        fillIndexCF(cfs, composite, colValues);
 +
 +        // check index
 +        IndexExpression expr = new 
IndexExpression(ByteBufferUtil.bytes(colName), Operator.EQ, 
ByteBufferUtil.bytes(1L));
 +        List<Row> rows = cfs.search(Util.range("", ""), Arrays.asList(expr), 
new IdentityQueryFilter(), numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +
 +        // scrub index
 +        Set<ColumnFamilyStore> indexCfss = 
cfs.indexManager.getIndexesBackedByCfs();
 +        assertTrue(indexCfss.size() == 1);
 +        for(ColumnFamilyStore indexCfs : indexCfss)
 +        {
 +            for (int i = 0; i < scrubs.length; i++)
 +            {
 +                boolean failure = !scrubs[i];
 +                if (failure)
 +                { //make sure the next scrub fails
 +                    
overrideWithGarbage(indexCfs.getSSTables().iterator().next(), 
ByteBufferUtil.bytes(1L), ByteBufferUtil.bytes(2L));
 +                }
-                 CompactionManager.AllSSTableOpStatus result = 
indexCfs.scrub(false, false, true, true);
++                CompactionManager.AllSSTableOpStatus result = 
indexCfs.scrub(false, false, true, true, 0);
 +                assertEquals(failure ?
 +                             CompactionManager.AllSSTableOpStatus.ABORTED :
 +                             CompactionManager.AllSSTableOpStatus.SUCCESSFUL,
 +                                result);
 +            }
 +        }
 +
 +
 +        // check index is still working
 +        rows = cfs.search(Util.range("", ""), Arrays.asList(expr), new 
IdentityQueryFilter(), numRows);
 +        assertNotNull(rows);
 +        assertEquals(numRows / 2, rows.size());
 +    }
  }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/897ffe87/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------

Reply via email to