Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/service/StorageServiceMBean.java
src/java/org/apache/cassandra/tools/NodeProbe.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/f8b43d4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/f8b43d4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/f8b43d4a
Branch: refs/heads/trunk
Commit: f8b43d4a811b0a7d9e88fb19d0aa4a6bf9117cc7
Parents: 136042e e20810c
Author: Yuki Morishita <[email protected]>
Authored: Tue Jan 6 14:26:05 2015 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Tue Jan 6 14:26:05 2015 -0600
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 22 ++++++++++++++------
.../cassandra/service/StorageServiceMBean.java | 9 ++++----
.../org/apache/cassandra/tools/NodeProbe.java | 4 ++--
3 files changed, 23 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index cc23712,8085d7b..b961381
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2495,39 -2412,27 +2495,43 @@@ public class StorageService extends Not
sendNotification(jmxNotification);
}
- public int forceRepairAsync(final String keyspace, final boolean
isSequential, final Collection<String> dataCenters, final Collection<String>
hosts, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies) throws IOException
{
- return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts,
primaryRange, fullRepair, columnFamilies);
- return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
dataCenters, hosts, primaryRange, columnFamilies);
++ return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
}
- public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies)
- public int forceRepairAsync(final String keyspace, final int
parallelismDegree, final Collection<String> dataCenters, final
Collection<String> hosts, final boolean primaryRange, final String...
columnFamilies)
++ public int forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies)
{
+ if (parallelismDegree < 0 || parallelismDegree >
RepairParallelism.values().length - 1)
+ {
+ throw new IllegalArgumentException("Invalid parallelism degree
specified: " + parallelismDegree);
+ }
- // when repairing only primary range, dataCenter nor hosts can be set
- if (primaryRange && (dataCenters != null || hosts != null))
+ Collection<Range<Token>> ranges;
+ if (primaryRange)
+ {
+ // when repairing only primary range, neither dataCenters nor
hosts can be set
+ if (dataCenters == null && hosts == null)
+ ranges = getPrimaryRanges(keyspace);
+ // except dataCenters only contain local DC (i.e. -local)
+ else if (dataCenters != null && dataCenters.size() == 1 &&
dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
+ ranges = getPrimaryRangesWithinDC(keyspace);
+ else
+ throw new IllegalArgumentException("You need to run primary
range repair on all nodes in the cluster.");
+ }
+ else
{
- throw new IllegalArgumentException("You need to run primary range
repair on all nodes in the cluster.");
+ ranges = getLocalRanges(keyspace);
}
- final Collection<Range<Token>> ranges = primaryRange ?
getLocalPrimaryRanges(keyspace) : getLocalRanges(keyspace);
- return forceRepairAsync(keyspace,
RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges,
columnFamilies);
+
- return forceRepairAsync(keyspace, parallelismDegree, dataCenters,
hosts, ranges, fullRepair, columnFamilies);
++ return forceRepairAsync(keyspace,
RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges,
fullRepair, columnFamilies);
}
- public int forceRepairAsync(final String keyspace, final
RepairParallelism parallelismDegree, final Collection<String> dataCenters,
final Collection<String> hosts, final Collection<Range<Token>> ranges, final
String... columnFamilies)
+ public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
+ {
+ return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts,
ranges, fullRepair, columnFamilies);
+ }
+
+ public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, Collection<String> dataCenters, Collection<String> hosts,
Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
{
if (ranges.isEmpty() ||
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
return 0;
@@@ -2580,21 -2471,62 +2584,27 @@@
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String keyspaceName, boolean isSequential, Collection<String>
dataCenters, final Collection<String> hosts, final String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws
IOException
{
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL,
dataCenters, hosts, fullRepair, columnFamilies);
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential ? RepairParallelism.SEQUENTIAL.ordinal() :
RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, columnFamilies);
++ return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential ? RepairParallelism.SEQUENTIAL.ordinal() :
RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair,
columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, RepairParallelism parallelismDegree, Collection<String>
dataCenters, Collection<String> hosts, boolean fullRepair, String...
columnFamilies)
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String keyspaceName, int parallelismDegree, Collection<String>
dataCenters, final Collection<String> hosts, final String... columnFamilies)
++ public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies)
{
+ if (parallelismDegree < 0 || parallelismDegree >
RepairParallelism.values().length - 1)
+ {
+ throw new IllegalArgumentException("Invalid parallelism degree
specified: " + parallelismDegree);
+ }
Collection<Range<Token>> repairingRange =
createRepairRangeFrom(beginToken, endToken);
logger.info("starting user-requested repair of range {} for keyspace
{} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
+ repairingRange, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, parallelismDegree, dataCenters,
hosts, repairingRange, fullRepair, columnFamilies);
+
+ RepairParallelism parallelism =
RepairParallelism.values()[parallelismDegree];
- if (!FBUtilities.isUnix() && parallelism !=
RepairParallelism.PARALLEL)
- {
- logger.warn("Snapshot-based repair is not yet supported on
Windows. Reverting to parallel repair.");
- parallelism = RepairParallelism.PARALLEL;
- }
- return forceRepairAsync(keyspaceName, parallelism, dataCenters,
hosts, repairingRange, columnFamilies);
- }
-
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String keyspaceName, boolean isSequential, boolean isLocal, final
String... columnFamilies)
- {
- Set<String> dataCenters = null;
- if (isLocal)
- {
- dataCenters =
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
- }
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential, dataCenters, null, columnFamilies);
++ return forceRepairAsync(keyspaceName, parallelism, dataCenters,
hosts, repairingRange, fullRepair, columnFamilies);
}
- /**
- * Trigger proactive repair for a keyspace and column families.
- */
- public void forceKeyspaceRepair(final String keyspaceName, boolean
isSequential, boolean isLocal, final String... columnFamilies) throws
IOException
- {
- forceKeyspaceRepairRange(keyspaceName, getLocalRanges(keyspaceName),
isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL,
isLocal, columnFamilies);
- }
-
- public void forceKeyspaceRepairPrimaryRange(final String keyspaceName,
boolean isSequential, boolean isLocal, final String... columnFamilies) throws
IOException
- {
- // primary range repair can only be performed for whole cluster.
- // NOTE: we should omit the param but keep API as is for now.
- if (isLocal)
- {
- throw new IllegalArgumentException("You need to run primary range
repair on all nodes in the cluster.");
- }
-
- forceKeyspaceRepairRange(keyspaceName,
getLocalPrimaryRanges(keyspaceName), isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, false,
columnFamilies);
- }
-
- public void forceKeyspaceRepairRange(String beginToken, String endToken,
final String keyspaceName, boolean isSequential, boolean isLocal, final
String... columnFamilies) throws IOException
+ public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair,
String... columnFamilies)
{
Collection<Range<Token>> repairingRange =
createRepairRangeFrom(beginToken, endToken);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index a661b97,10d17fd..e0441fb
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -283,9 -266,10 +281,10 @@@ public interface StorageServiceMBean ex
* type: "repair"
* userObject: int array of length 2, [0]=command number, [1]=ordinal
of AntiEntropyService.Status
*
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
* @return Repair command number, or 0 if nothing to repair
*/
- public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies);
- public int forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters, final Collection<String> hosts, boolean
primaryRange, String... columnFamilies);
++ public int forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies);
/**
* Same as forceRepairAsync, but handles a specified range
@@@ -294,8 -278,10 +293,10 @@@
/**
* Same as forceRepairAsync, but handles a specified range
+ *
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
*/
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, RepairParallelism parallelismDegree, Collection<String>
dataCenters, Collection<String> hosts, boolean fullRepair, String...
columnFamilies);
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String keyspaceName, int parallelismDegree, Collection<String>
dataCenters, final Collection<String> hosts, final String... columnFamilies);
++ public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies);
/**
* Invoke repair asynchronously.
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f8b43d4a/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index 9c9e93d,6b28f18..00f9686
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -1307,16 -1075,16 +1307,16 @@@ class RepairRunner implements Notificat
this.columnFamilies = columnFamilies;
}
- public boolean repairAndWait(StorageServiceMBean ssProxy,
RepairParallelism parallelismDegree, Collection<String> dataCenters, final
Collection<String> hosts, boolean primaryRangeOnly) throws Exception
+ public boolean repairAndWait(StorageServiceMBean ssProxy,
RepairParallelism parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws
Exception
{
- cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree,
dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
- cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree.ordinal(),
dataCenters, hosts, primaryRangeOnly, columnFamilies);
++ cmd = ssProxy.forceRepairAsync(keyspace, parallelismDegree.ordinal(),
dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies);
waitForRepair();
return success;
}
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy,
RepairParallelism parallelismDegree, Collection<String> dataCenters, final
Collection<String> hosts, String startToken, String endToken) throws Exception
+ public boolean repairRangeAndWait(StorageServiceMBean ssProxy,
RepairParallelism parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, String startToken, String endToken, boolean
fullRepair) throws Exception
{
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace,
parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies);
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace,
parallelismDegree.ordinal(), dataCenters, hosts, columnFamilies);
++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace,
parallelismDegree.ordinal(), dataCenters, hosts, fullRepair, columnFamilies);
waitForRepair();
return success;
}