Merge branch 'cassandra-3.0' into cassandra-3.5

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0a250854
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0a250854
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0a250854

Branch: refs/heads/trunk
Commit: 0a2508544069f035dc43a81b8700c0b7bf2d28e3
Parents: c7ef7c9 0c91977
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Mar 29 11:05:46 2016 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Mar 29 11:05:46 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 20 ++++-----
 .../db/compaction/CompactionManager.java        | 44 +++++++++++++-------
 .../cassandra/service/StorageService.java       | 30 ++++++++++---
 .../cassandra/service/StorageServiceMBean.java  |  8 ++++
 .../org/apache/cassandra/tools/NodeProbe.java   | 38 ++++++++++-------
 .../cassandra/tools/nodetool/Cleanup.java       |  9 +++-
 .../tools/nodetool/RelocateSSTables.java        |  8 +++-
 .../apache/cassandra/tools/nodetool/Scrub.java  | 13 ++++--
 .../tools/nodetool/UpgradeSSTable.java          | 10 ++++-
 .../org/apache/cassandra/db/CleanupTest.java    |  6 +--
 .../unit/org/apache/cassandra/db/ScrubTest.java | 16 +++----
 12 files changed, 141 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 2907df9,fdc873e..3e69959
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -25,43 -23,14 +25,45 @@@ Merged from 2.2
   * Only log yaml config once, at startup (CASSANDRA-11217)
   * Reference leak with parallel repairs on the same table (CASSANDRA-11215)
  Merged from 2.1:
+  * Add a -j parameter to scrub/cleanup/upgradesstables to state how
+    many threads to use (CASSANDRA-11179)
 - * Backport CASSANDRA-10679 (CASSANDRA-9598)
 - * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
   * COPY FROM on large datasets: fix progress report and debug performance 
(CASSANDRA-11053)
 + * InvalidateKeys should have a weak ref to key cache (CASSANDRA-11176)
  
 -3.0.4
 - * Preserve order for preferred SSL cipher suites (CASSANDRA-11164)
 +
 +3.4
 + * (cqlsh) add cqlshrc option to always connect using ssl (CASSANDRA-10458)
 + * Cleanup a few resource warnings (CASSANDRA-11085)
 + * Allow custom tracing implementations (CASSANDRA-10392)
 + * Extract LoaderOptions to be able to be used from outside (CASSANDRA-10637)
 + * fix OnDiskIndexTest to properly treat empty ranges (CASSANDRA-11205)
 + * fix TrackerTest to handle new notifications (CASSANDRA-11178)
 + * add SASI validation for partitioner and complex columns (CASSANDRA-11169)
 + * Add caching of encrypted credentials in PasswordAuthenticator 
(CASSANDRA-7715)
 + * fix SASI memtable switching on flush (CASSANDRA-11159)
 + * Remove duplicate offline compaction tracking (CASSANDRA-11148)
 + * fix EQ semantics of analyzed SASI indexes (CASSANDRA-11130)
 + * Support long name output for nodetool commands (CASSANDRA-7950)
 + * Encrypted hints (CASSANDRA-11040)
 + * SASI index options validation (CASSANDRA-11136)
 + * Optimize disk seek using min/max column name meta data when the LIMIT 
clause is used
 +   (CASSANDRA-8180)
 + * Add LIKE support to CQL3 (CASSANDRA-11067)
 + * Generic Java UDF types (CASSANDRA-10819)
 + * cqlsh: Include sub-second precision in timestamps by default 
(CASSANDRA-10428)
 + * Set javac encoding to utf-8 (CASSANDRA-11077)
 + * Integrate SASI index into Cassandra (CASSANDRA-10661)
 + * Add --skip-flush option to nodetool snapshot
 + * Skip values for non-queried columns (CASSANDRA-10657)
 + * Add support for secondary indexes on static columns (CASSANDRA-8103)
 + * CommitLogUpgradeTestMaker creates broken commit logs (CASSANDRA-11051)
 + * Add metric for number of dropped mutations (CASSANDRA-10866)
 + * Simplify row cache invalidation code (CASSANDRA-10396)
 + * Support user-defined compaction through nodetool (CASSANDRA-10660)
 + * Stripe view locks by key and table ID to reduce contention 
(CASSANDRA-10981)
 + * Add nodetool gettimeout and settimeout commands (CASSANDRA-10953)
 + * Add 3.0 metadata to sstablemetadata output (CASSANDRA-10838)
 +Merged from 3.0:
   * MV should only query complex columns included in the view (CASSANDRA-11069)
   * Failed aggregate creation breaks server permanently (CASSANDRA-11064)
   * Add sstabledump tool (CASSANDRA-7464)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index fbfb7ee,a1db5b5..f193c4d
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@@ -1436,16 -1363,11 +1436,16 @@@ public class ColumnFamilyStore implemen
          return 
CompactionManager.instance.performVerify(ColumnFamilyStore.this, 
extendedVerify);
      }
  
-     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion) throws ExecutionException, InterruptedException
+     public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean 
excludeCurrentVersion, int jobs) throws ExecutionException, InterruptedException
      {
-         return 
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, 
excludeCurrentVersion);
+         return 
CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, 
excludeCurrentVersion, jobs);
      }
  
-     public CompactionManager.AllSSTableOpStatus relocateSSTables() throws 
ExecutionException, InterruptedException
++    public CompactionManager.AllSSTableOpStatus relocateSSTables(int jobs) 
throws ExecutionException, InterruptedException
 +    {
-         return CompactionManager.instance.relocateSSTables(this);
++        return CompactionManager.instance.relocateSSTables(this, jobs);
 +    }
 +
      public void markObsolete(Collection<SSTableReader> sstables, 
OperationType compactionType)
      {
          assert !sstables.isEmpty();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 627ae32,f339660..f9b0997
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -438,80 -451,9 +452,80 @@@ public class CompactionManager implemen
                  CleanupStrategy cleanupStrategy = 
CleanupStrategy.get(cfStore, ranges, FBUtilities.nowInSeconds());
                  doCleanupOne(cfStore, txn, cleanupStrategy, ranges, 
hasIndexes);
              }
-         }, OperationType.CLEANUP);
+         }, jobs, OperationType.CLEANUP);
      }
  
-     public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs) 
throws ExecutionException, InterruptedException
++    public AllSSTableOpStatus relocateSSTables(final ColumnFamilyStore cfs, 
int jobs) throws ExecutionException, InterruptedException
 +    {
 +        if (!cfs.getPartitioner().splitter().isPresent())
 +        {
 +            logger.info("Partitioner does not support splitting");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +        final Collection<Range<Token>> r = 
StorageService.instance.getLocalRanges(cfs.keyspace.getName());
 +
 +        if (r.isEmpty())
 +        {
 +            logger.info("Relocate cannot run before a node has joined the 
ring");
 +            return AllSSTableOpStatus.ABORTED;
 +        }
 +
 +        final List<Range<Token>> localRanges = Range.sort(r);
 +        final Directories.DataDirectory[] locations = 
cfs.getDirectories().getWriteableLocations();
 +        final List<PartitionPosition> diskBoundaries = 
StorageService.getDiskBoundaries(localRanges, cfs.getPartitioner(), locations);
 +
 +        return parallelAllSSTableOperation(cfs, new OneSSTableOperation()
 +        {
 +            @Override
 +            public Iterable<SSTableReader> 
filterSSTables(LifecycleTransaction transaction)
 +            {
 +                Set<SSTableReader> originals = 
Sets.newHashSet(transaction.originals());
 +                Set<SSTableReader> needsRelocation = 
originals.stream().filter(s -> 
!inCorrectLocation(s)).collect(Collectors.toSet());
 +                transaction.cancel(Sets.difference(originals, 
needsRelocation));
 +
 +                Map<Integer, List<SSTableReader>> groupedByDisk = 
needsRelocation.stream().collect(Collectors.groupingBy((s) ->
 +                        
CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), 
s)));
 +
 +                int maxSize = 0;
 +                for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                    maxSize = Math.max(maxSize, diskSSTables.size());
 +
 +                List<SSTableReader> mixedSSTables = new ArrayList<>();
 +
 +                for (int i = 0; i < maxSize; i++)
 +                    for (List<SSTableReader> diskSSTables : 
groupedByDisk.values())
 +                        if (i < diskSSTables.size())
 +                            mixedSSTables.add(diskSSTables.get(i));
 +
 +                return mixedSSTables;
 +            }
 +
 +            private boolean inCorrectLocation(SSTableReader sstable)
 +            {
 +                if (!cfs.getPartitioner().splitter().isPresent())
 +                    return true;
 +                int directoryIndex = 
CompactionStrategyManager.getCompactionStrategyIndex(cfs, cfs.getDirectories(), 
sstable);
 +                Directories.DataDirectory[] locations = 
cfs.getDirectories().getWriteableLocations();
 +
 +                Directories.DataDirectory location = 
locations[directoryIndex];
 +                PartitionPosition diskLast = 
diskBoundaries.get(directoryIndex);
 +                // the location we get from directoryIndex is based on the 
first key in the sstable
 +                // now we need to make sure the last key is less than the 
boundary as well:
 +                return 
sstable.descriptor.directory.getAbsolutePath().startsWith(location.location.getAbsolutePath())
 && sstable.last.compareTo(diskLast) <= 0;
 +            }
 +
 +            @Override
 +            public void execute(LifecycleTransaction txn) throws IOException
 +            {
 +                logger.debug("Relocating {}", txn.originals());
 +                AbstractCompactionTask task = 
cfs.getCompactionStrategyManager().getCompactionTask(txn, NO_GC, 
Long.MAX_VALUE);
 +                task.setUserDefined(true);
 +                task.setCompactionType(OperationType.RELOCATE);
 +                task.execute(metrics);
 +            }
-         }, OperationType.RELOCATE);
++        }, jobs, OperationType.RELOCATE);
 +    }
 +
      public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore 
cfs,
                                            final Collection<Range<Token>> 
ranges,
                                            final Refs<SSTableReader> sstables,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 22d67e5,75573ac..52dcb97
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2789,59 -2712,6 +2804,64 @@@ public class StorageService extends Not
          }
      }
  
 +    public int relocateSSTables(String keyspaceName, String ... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
 +    {
++        return relocateSSTables(0, keyspaceName, columnFamilies);
++    }
++
++    public int relocateSSTables(int jobs, String keyspaceName, String ... 
columnFamilies) throws IOException, ExecutionException, InterruptedException
++    {
 +        CompactionManager.AllSSTableOpStatus status = 
CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
 +        for (ColumnFamilyStore cfs : getValidColumnFamilies(false, false, 
keyspaceName, columnFamilies))
 +        {
-             CompactionManager.AllSSTableOpStatus oneStatus = 
cfs.relocateSSTables();
++            CompactionManager.AllSSTableOpStatus oneStatus = 
cfs.relocateSSTables(jobs);
 +            if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL)
 +                status = oneStatus;
 +        }
 +        return status.statusCode;
 +    }
 +
 +    /**
 +     * Takes the snapshot of a multiple column family from different 
keyspaces. A snapshot name must be specified.
 +     *
 +     * @param tag
 +     *            the tag given to the snapshot; may not be null or empty
 +     * @param options
 +     *            Map of options (skipFlush is the only supported option for 
now)
 +     * @param entities
 +     *            list of keyspaces / tables in the form of empty | ks1 ks2 
... | ks1.cf1,ks2.cf2,...
 +     */
 +    @Override
 +    public void takeSnapshot(String tag, Map<String, String> options, 
String... entities) throws IOException
 +    {
 +        boolean skipFlush = 
Boolean.parseBoolean(options.getOrDefault("skipFlush", "false"));
 +
 +        if (entities != null && entities.length > 0 && 
entities[0].contains("."))
 +        {
 +            takeMultipleTableSnapshot(tag, skipFlush, entities);
 +        }
 +        else
 +        {
 +            takeSnapshot(tag, skipFlush, entities);
 +        }
 +    }
 +
 +    /**
 +     * Takes the snapshot of a specific table. A snapshot name must be
 +     * specified.
 +     *
 +     * @param keyspaceName
 +     *            the keyspace which holds the specified table
 +     * @param tableName
 +     *            the table to snapshot
 +     * @param tag
 +     *            the tag given to the snapshot; may not be null or empty
 +     */
 +    public void takeTableSnapshot(String keyspaceName, String tableName, 
String tag)
 +            throws IOException {
 +        takeMultipleTableSnapshot(tag, false, keyspaceName + "." + tableName);
 +    }
 +
      /**
       * Takes the snapshot for the given keyspaces. A snapshot name must be 
specified.
       *

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index dc12253,5b7331d..2238f33
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -251,7 -248,6 +251,9 @@@ public interface StorageServiceMBean ex
       */
      public void forceKeyspaceCompaction(boolean splitOutput, String 
keyspaceName, String... tableNames) throws IOException, ExecutionException, 
InterruptedException;
  
++    @Deprecated
 +    public int relocateSSTables(String keyspace, String ... cfnames) throws 
IOException, ExecutionException, InterruptedException;
++    public int relocateSSTables(int jobs, String keyspace, String ... 
cfnames) throws IOException, ExecutionException, InterruptedException;
      /**
       * Trigger a cleanup of keys on a single keyspace
       */

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 405f70c,a9d71d8..7d15926
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -304,11 -309,6 +314,11 @@@ public class NodeProbe implements AutoC
          ssProxy.forceKeyspaceCompaction(splitOutput, keyspaceName, 
tableNames);
      }
  
-     public void relocateSSTables(String keyspace, String[] cfnames) throws 
IOException, ExecutionException, InterruptedException
++    public void relocateSSTables(int jobs, String keyspace, String[] cfnames) 
throws IOException, ExecutionException, InterruptedException
 +    {
-         ssProxy.relocateSSTables(keyspace, cfnames);
++        ssProxy.relocateSSTables(jobs, keyspace, cfnames);
 +    }
 +
      public void forceKeyspaceFlush(String keyspaceName, String... tableNames) 
throws IOException, ExecutionException, InterruptedException
      {
          ssProxy.forceKeyspaceFlush(keyspaceName, tableNames);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
index 8522bc4,0000000..7c3066c
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
+++ b/src/java/org/apache/cassandra/tools/nodetool/RelocateSSTables.java
@@@ -1,49 -1,0 +1,55 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.tools.nodetool;
 +
 +import java.util.ArrayList;
 +import java.util.List;
 +
 +import io.airlift.command.Arguments;
 +import io.airlift.command.Command;
++import io.airlift.command.Option;
 +import org.apache.cassandra.tools.NodeProbe;
 +import org.apache.cassandra.tools.NodeTool;
 +
 +@Command(name = "relocatesstables", description = "Relocates sstables to the 
correct disk")
 +public class RelocateSSTables extends NodeTool.NodeToolCmd
 +{
 +    @Arguments(usage = "<keyspace> <table>", description = "The keyspace and 
table name")
 +    private List<String> args = new ArrayList<>();
 +
++    @Option(title = "jobs",
++            name = {"-j", "--jobs"},
++            description = "Number of sstables to relocate simultanously, set 
to 0 to use all available compaction threads")
++    private int jobs = 2;
++
 +    @Override
 +    public void execute(NodeProbe probe)
 +    {
 +        List<String> keyspaces = parseOptionalKeyspace(args, probe);
 +        String[] cfnames = parseOptionalTables(args);
 +        try
 +        {
 +            for (String keyspace : keyspaces)
-                 probe.relocateSSTables(keyspace, cfnames);
++                probe.relocateSSTables(jobs, keyspace, cfnames);
 +        }
 +        catch (Exception e)
 +        {
 +            throw new RuntimeException("Got error while relocating", e);
 +        }
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/test/unit/org/apache/cassandra/db/CleanupTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0a250854/test/unit/org/apache/cassandra/db/ScrubTest.java
----------------------------------------------------------------------

Reply via email to