Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/d434a33a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/d434a33a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/d434a33a

Branch: refs/heads/cassandra-3.1
Commit: d434a33ace2dfe6715f4857f9537ee884f4ef410
Parents: 73a730f a8e8a67
Author: Marcus Eriksson <marc...@apache.org>
Authored: Tue Nov 17 10:07:04 2015 +0100
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Tue Nov 17 10:07:04 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                            |  1 +
 .../apache/cassandra/repair/messages/RepairOption.java |  3 +++
 .../org/apache/cassandra/service/StorageService.java   |  4 ++++
 .../cassandra/repair/messages/RepairOptionTest.java    | 13 +++++++++++--
 4 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 5705453,b6b394a..489a76d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,16 -1,5 +1,17 @@@
 -2.1.12
 +2.2.4
 + * Fix SimpleDateType type compatibility (CASSANDRA-10027)
 + * (Hadoop) fix splits calculation (CASSANDRA-10640)
 + * (Hadoop) ensure that Cluster instances are always closed (CASSANDRA-10058)
 + * (cqlsh) show partial trace if incomplete after max_trace_wait 
(CASSANDRA-7645)
 + * Use most up-to-date version of schema for system tables (CASSANDRA-10652)
 + * Deprecate memory_allocator in cassandra.yaml (CASSANDRA-10581,10628)
 + * Expose phi values from failure detector via JMX and tweak debug
 +   and trace logging (CASSANDRA-9526)
 + * Fix RangeNamesQueryPager (CASSANDRA-10509)
 + * Deprecate Pig support (CASSANDRA-10542)
 + * Reduce contention getting instances of CompositeType (CASSANDRA-10433)
 +Merged from 2.1:
+  * Reject incremental repair with subrange repair (CASSANDRA-10422)
   * Add a nodetool command to refresh size_estimates (CASSANDRA-9579)
   * Shutdown compaction in drain to prevent leak (CASSANDRA-10079)
   * Invalidate cache after stream receive task is completed (CASSANDRA-10341)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/messages/RepairOption.java
index f3e452c,0000000..1780b6b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/messages/RepairOption.java
+++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java
@@@ -1,308 -1,0 +1,311 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair.messages;
 +
 +import java.util.*;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.tools.nodetool.Repair;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +/**
 + * Repair options.
 + */
 +public class RepairOption
 +{
 +    public static final String PARALLELISM_KEY = "parallelism";
 +    public static final String PRIMARY_RANGE_KEY = "primaryRange";
 +    public static final String INCREMENTAL_KEY = "incremental";
 +    public static final String JOB_THREADS_KEY = "jobThreads";
 +    public static final String RANGES_KEY = "ranges";
 +    public static final String COLUMNFAMILIES_KEY = "columnFamilies";
 +    public static final String DATACENTERS_KEY = "dataCenters";
 +    public static final String HOSTS_KEY = "hosts";
 +    public static final String TRACE_KEY = "trace";
 +
 +    // we don't want to push nodes too much for repair
 +    public static final int MAX_JOB_THREADS = 4;
 +
 +    private static final Logger logger = 
LoggerFactory.getLogger(RepairOption.class);
 +
 +    /**
 +     * Construct RepairOptions object from given map of Strings.
 +     * <p>
 +     * Available options are:
 +     *
 +     * <table>
 +     *     <caption>Repair Options</caption>
 +     *     <thead>
 +     *         <tr>
 +     *             <th>key</th>
 +     *             <th>value</th>
 +     *             <th>default (when key not given)</th>
 +     *         </tr>
 +     *     </thead>
 +     *     <tbody>
 +     *         <tr>
 +     *             <td>parallelism</td>
 +     *             <td>"sequential", "parallel" or "dc_parallel"</td>
 +     *             <td>"sequential"</td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>primaryRange</td>
 +     *             <td>"true" if perform repair only on primary range.</td>
 +     *             <td>false</td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>incremental</td>
 +     *             <td>"true" if perform incremental repair.</td>
 +     *             <td>false</td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>trace</td>
 +     *             <td>"true" if repair is traced.</td>
 +     *             <td>false</td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>jobThreads</td>
 +     *             <td>Number of threads to use to run repair job.</td>
 +     *             <td>1</td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>ranges</td>
 +     *             <td>Ranges to repair. A range is expressed as &lt;start 
token&gt;:&lt;end token&gt;
 +     *             and multiple ranges can be given as comma separated 
ranges(e.g. aaa:bbb,ccc:ddd).</td>
 +     *             <td></td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>columnFamilies</td>
 +     *             <td>Specify names of ColumnFamilies to repair.
 +     *             Multiple ColumnFamilies can be given as comma separated 
values(e.g. cf1,cf2,cf3).</td>
 +     *             <td></td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>dataCenters</td>
 +     *             <td>Specify names of data centers who participate in this 
repair.
 +     *             Multiple data centers can be given as comma separated 
values(e.g. dc1,dc2,dc3).</td>
 +     *             <td></td>
 +     *         </tr>
 +     *         <tr>
 +     *             <td>hosts</td>
 +     *             <td>Specify names of hosts who participate in this repair.
 +     *             Multiple hosts can be given as comma separated values(e.g. 
cass1,cass2).</td>
 +     *             <td></td>
 +     *         </tr>
 +     *     </tbody>
 +     * </table>
 +     *
 +     * @param options options to parse
 +     * @param partitioner partitioner is used to construct token ranges
 +     * @return RepairOptions object
 +     */
 +    public static RepairOption parse(Map<String, String> options, 
IPartitioner partitioner)
 +    {
 +        // if no parallel option is given, then this will be "sequential" by 
default.
 +        RepairParallelism parallelism = 
RepairParallelism.fromName(options.get(PARALLELISM_KEY));
 +        boolean primaryRange = 
Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY));
 +        boolean incremental = 
Boolean.parseBoolean(options.get(INCREMENTAL_KEY));
 +        boolean trace = Boolean.parseBoolean(options.get(TRACE_KEY));
 +
 +        int jobThreads = 1;
 +        if (options.containsKey(JOB_THREADS_KEY))
 +        {
 +            try
 +            {
 +                jobThreads = Integer.parseInt(options.get(JOB_THREADS_KEY));
 +            }
 +            catch (NumberFormatException ignore) {}
 +        }
 +        // ranges
 +        String rangesStr = options.get(RANGES_KEY);
 +        Set<Range<Token>> ranges = new HashSet<>();
 +        if (rangesStr != null)
 +        {
++            if (incremental)
++                throw new IllegalArgumentException("Incremental repair can't 
be requested with subrange repair " +
++                                                   "because each subrange 
repair would generate an anti-compacted table");
 +            StringTokenizer tokenizer = new StringTokenizer(rangesStr, ",");
 +            while (tokenizer.hasMoreTokens())
 +            {
 +                String[] rangeStr = tokenizer.nextToken().split(":", 2);
 +                if (rangeStr.length < 2)
 +                {
 +                    continue;
 +                }
 +                Token parsedBeginToken = 
partitioner.getTokenFactory().fromString(rangeStr[0].trim());
 +                Token parsedEndToken = 
partitioner.getTokenFactory().fromString(rangeStr[1].trim());
 +                ranges.add(new Range<>(parsedBeginToken, parsedEndToken));
 +            }
 +        }
 +
 +        RepairOption option = new RepairOption(parallelism, primaryRange, 
incremental, trace, jobThreads, ranges);
 +
 +        // data centers
 +        String dataCentersStr = options.get(DATACENTERS_KEY);
 +        Collection<String> dataCenters = new HashSet<>();
 +        if (dataCentersStr != null)
 +        {
 +            StringTokenizer tokenizer = new StringTokenizer(dataCentersStr, 
",");
 +            while (tokenizer.hasMoreTokens())
 +            {
 +                dataCenters.add(tokenizer.nextToken().trim());
 +            }
 +            option.getDataCenters().addAll(dataCenters);
 +        }
 +
 +        // hosts
 +        String hostsStr = options.get(HOSTS_KEY);
 +        Collection<String> hosts = new HashSet<>();
 +        if (hostsStr != null)
 +        {
 +            StringTokenizer tokenizer = new StringTokenizer(hostsStr, ",");
 +            while (tokenizer.hasMoreTokens())
 +            {
 +                hosts.add(tokenizer.nextToken().trim());
 +            }
 +            option.getHosts().addAll(hosts);
 +        }
 +
 +        // columnfamilies
 +        String cfStr = options.get(COLUMNFAMILIES_KEY);
 +        if (cfStr != null)
 +        {
 +            Collection<String> columnFamilies = new HashSet<>();
 +            StringTokenizer tokenizer = new StringTokenizer(cfStr, ",");
 +            while (tokenizer.hasMoreTokens())
 +            {
 +                columnFamilies.add(tokenizer.nextToken().trim());
 +            }
 +            option.getColumnFamilies().addAll(columnFamilies);
 +        }
 +
 +        // validate options
 +        if (jobThreads > MAX_JOB_THREADS)
 +        {
 +            throw new IllegalArgumentException("Too many job threads. Max is 
" + MAX_JOB_THREADS);
 +        }
 +        if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty()))
 +        {
 +            throw new IllegalArgumentException("You need to run primary range 
repair on all nodes in the cluster.");
 +        }
 +
 +        return option;
 +    }
 +
 +    private final RepairParallelism parallelism;
 +    private final boolean primaryRange;
 +    private final boolean incremental;
 +    private final boolean trace;
 +    private final int jobThreads;
 +
 +    private final Collection<String> columnFamilies = new HashSet<>();
 +    private final Collection<String> dataCenters = new HashSet<>();
 +    private final Collection<String> hosts = new HashSet<>();
 +    private final Collection<Range<Token>> ranges = new HashSet<>();
 +
 +    public RepairOption(RepairParallelism parallelism, boolean primaryRange, 
boolean incremental, boolean trace, int jobThreads, Collection<Range<Token>> 
ranges)
 +    {
 +        if (FBUtilities.isWindows() &&
 +            (DatabaseDescriptor.getDiskAccessMode() != 
Config.DiskAccessMode.standard || DatabaseDescriptor.getIndexAccessMode() != 
Config.DiskAccessMode.standard) &&
 +            parallelism == RepairParallelism.SEQUENTIAL)
 +        {
 +            logger.warn("Sequential repair disabled when memory-mapped I/O is 
configured on Windows. Reverting to parallel.");
 +            this.parallelism = RepairParallelism.PARALLEL;
 +        }
 +        else
 +            this.parallelism = parallelism;
 +
 +        this.primaryRange = primaryRange;
 +        this.incremental = incremental;
 +        this.trace = trace;
 +        this.jobThreads = jobThreads;
 +        this.ranges.addAll(ranges);
 +    }
 +
 +    public RepairParallelism getParallelism()
 +    {
 +        return parallelism;
 +    }
 +
 +    public boolean isPrimaryRange()
 +    {
 +        return primaryRange;
 +    }
 +
 +    public boolean isIncremental()
 +    {
 +        return incremental;
 +    }
 +
 +    public boolean isTraced()
 +    {
 +        return trace;
 +    }
 +
 +    public int getJobThreads()
 +    {
 +        return jobThreads;
 +    }
 +
 +    public Collection<String> getColumnFamilies()
 +    {
 +        return columnFamilies;
 +    }
 +
 +    public Collection<Range<Token>> getRanges()
 +    {
 +        return ranges;
 +    }
 +
 +    public Collection<String> getDataCenters()
 +    {
 +        return dataCenters;
 +    }
 +
 +    public Collection<String> getHosts()
 +    {
 +        return hosts;
 +    }
 +
 +    public boolean isGlobal()
 +    {
 +        return dataCenters.isEmpty() && hosts.isEmpty();
 +    }
 +    @Override
 +    public String toString()
 +    {
 +        return "repair options (" +
 +                       "parallelism: " + parallelism +
 +                       ", primary range: " + primaryRange +
 +                       ", incremental: " + incremental +
 +                       ", job threads: " + jobThreads +
 +                       ", ColumnFamilies: " + columnFamilies +
 +                       ", dataCenters: " + dataCenters +
 +                       ", hosts: " + hosts +
 +                       ", # of ranges: " + ranges.size() +
 +                       ')';
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 74b3c73,03c1960..b5ce38b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2958,47 -2819,25 +2958,51 @@@ public class StorageService extends Not
          {
              throw new IllegalArgumentException("Invalid parallelism degree 
specified: " + parallelismDegree);
          }
 -        Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
 -
 -        logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
 -                           repairingRange, keyspaceName, columnFamilies);
 -
          RepairParallelism parallelism = 
RepairParallelism.values()[parallelismDegree];
 -        return forceRepairAsync(keyspaceName, parallelism, dataCenters, 
hosts, repairingRange, fullRepair, columnFamilies);
 -    }
 +        if (FBUtilities.isWindows() && parallelism != 
RepairParallelism.PARALLEL)
 +        {
 +            logger.warn("Snapshot-based repair is not yet supported on 
Windows.  Reverting to parallel repair.");
 +            parallelism = RepairParallelism.PARALLEL;
 +        }
+ 
 -    public int forceRepairRangeAsync(String beginToken, String endToken, 
String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, 
String... columnFamilies)
 -    {
+         if (!fullRepair)
 -            throw new IllegalArgumentException("Incremental repair can't be 
requested with subrange repair because " +
 -                                               "each subrange repair would 
generate an anti-compacted table");
++            throw new IllegalArgumentException("Incremental repair can't be 
requested with subrange repair " +
++                                               "because each subrange repair 
would generate an anti-compacted table");
          Collection<Range<Token>> repairingRange = 
createRepairRangeFrom(beginToken, endToken);
  
 +        RepairOption options = new RepairOption(parallelism, false, 
!fullRepair, false, 1, repairingRange);
 +        options.getDataCenters().addAll(dataCenters);
 +        if (hosts != null)
 +        {
 +            options.getHosts().addAll(hosts);
 +        }
 +        if (columnFamilies != null)
 +        {
 +            for (String columnFamily : columnFamilies)
 +            {
 +                options.getColumnFamilies().add(columnFamily);
 +            }
 +        }
 +
          logger.info("starting user-requested repair of range {} for keyspace 
{} and column families {}",
 -                           repairingRange, keyspaceName, columnFamilies);
 -        return forceRepairAsync(keyspaceName, isSequential, isLocal, 
repairingRange, fullRepair, columnFamilies);
 +                    repairingRange, keyspaceName, columnFamilies);
 +        return forceRepairAsync(keyspaceName, options);
 +    }
 +
 +    public int forceRepairRangeAsync(String beginToken,
 +                                     String endToken,
 +                                     String keyspaceName,
 +                                     boolean isSequential,
 +                                     boolean isLocal,
 +                                     boolean fullRepair,
 +                                     String... columnFamilies)
 +    {
 +        Set<String> dataCenters = null;
 +        if (isLocal)
 +        {
 +            dataCenters = 
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
 +        }
 +        return forceRepairRangeAsync(beginToken, endToken, keyspaceName, 
isSequential, dataCenters, null, fullRepair, columnFamilies);
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/d434a33a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
index 11ae69f,0000000..3257a10
mode 100644,000000..100644
--- a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
+++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java
@@@ -1,96 -1,0 +1,105 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair.messages;
 +
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Set;
 +
 +import org.junit.Test;
 +
++import com.google.common.collect.ImmutableMap;
++
 +import org.apache.cassandra.config.Config;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Murmur3Partitioner;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.RepairParallelism;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static org.junit.Assert.*;
 +
 +public class RepairOptionTest
 +{
 +    @Test
 +    public void testParseOptions()
 +    {
 +        IPartitioner partitioner = Murmur3Partitioner.instance;
 +        Token.TokenFactory tokenFactory = partitioner.getTokenFactory();
 +
 +        // parse with empty options
 +        RepairOption option = RepairOption.parse(new HashMap<String, 
String>(), partitioner);
 +
 +        if (FBUtilities.isWindows() && 
(DatabaseDescriptor.getDiskAccessMode() != Config.DiskAccessMode.standard || 
DatabaseDescriptor.getIndexAccessMode() != Config.DiskAccessMode.standard))
 +            assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
 +        else
 +            assertTrue(option.getParallelism() == 
RepairParallelism.SEQUENTIAL);
 +
 +        assertFalse(option.isPrimaryRange());
 +        assertFalse(option.isIncremental());
 +
 +        // parse everything
 +        Map<String, String> options = new HashMap<>();
 +        options.put(RepairOption.PARALLELISM_KEY, "parallel");
 +        options.put(RepairOption.PRIMARY_RANGE_KEY, "false");
-         options.put(RepairOption.INCREMENTAL_KEY, "true");
++        options.put(RepairOption.INCREMENTAL_KEY, "false");
 +        options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30");
 +        options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3");
 +        options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3");
 +        options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3");
 +
 +        option = RepairOption.parse(options, partitioner);
 +        assertTrue(option.getParallelism() == RepairParallelism.PARALLEL);
 +        assertFalse(option.isPrimaryRange());
-         assertTrue(option.isIncremental());
++        assertFalse(option.isIncremental());
 +
 +        Set<Range<Token>> expectedRanges = new HashSet<>(3);
 +        expectedRanges.add(new Range<>(tokenFactory.fromString("0"), 
tokenFactory.fromString("10")));
 +        expectedRanges.add(new Range<>(tokenFactory.fromString("11"), 
tokenFactory.fromString("20")));
 +        expectedRanges.add(new Range<>(tokenFactory.fromString("21"), 
tokenFactory.fromString("30")));
 +        assertEquals(expectedRanges, option.getRanges());
 +
 +        Set<String> expectedCFs = new HashSet<>(3);
 +        expectedCFs.add("cf1");
 +        expectedCFs.add("cf2");
 +        expectedCFs.add("cf3");
 +        assertEquals(expectedCFs, option.getColumnFamilies());
 +
 +        Set<String> expectedDCs = new HashSet<>(3);
 +        expectedDCs.add("dc1");
 +        expectedDCs.add("dc2");
 +        expectedDCs.add("dc3");
 +        assertEquals(expectedDCs, option.getDataCenters());
 +
 +        Set<String> expectedHosts = new HashSet<>(3);
 +        expectedHosts.add("127.0.0.1");
 +        expectedHosts.add("127.0.0.2");
 +        expectedHosts.add("127.0.0.3");
 +        assertEquals(expectedHosts, option.getHosts());
 +    }
++
++    @Test(expected=IllegalArgumentException.class)
++    public void testIncrementalRepairWithSubrangesThrows() throws Exception
++    {
++        RepairOption.parse(ImmutableMap.of(RepairOption.INCREMENTAL_KEY, 
"true", RepairOption.RANGES_KEY, ""),
++                           Murmur3Partitioner.instance);
++    }
 +}

Reply via email to