Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/service/StorageServiceMBean.java
src/java/org/apache/cassandra/tools/NodeProbe.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9529fba6
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9529fba6
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9529fba6
Branch: refs/heads/trunk
Commit: 9529fba6679e903375ec935467de1017a1f4574f
Parents: ffb7f64 f8b43d4
Author: Yuki Morishita <[email protected]>
Authored: Tue Jan 6 16:02:12 2015 -0600
Committer: Yuki Morishita <[email protected]>
Committed: Tue Jan 6 16:02:12 2015 -0600
----------------------------------------------------------------------
.../cassandra/service/StorageService.java | 30 +++++++++++++-------
.../cassandra/service/StorageServiceMBean.java | 23 +++++++++++----
2 files changed, 37 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9529fba6/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 391f890,b961381..4740cd3
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2430,153 -2495,122 +2430,163 @@@ public class StorageService extends Not
sendNotification(jmxNotification);
}
- public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies) throws IOException
+ public int repairAsync(String keyspace, Map<String, String> repairSpec)
+ {
+ RepairOption option = RepairOption.parse(repairSpec,
getPartitioner());
+ // if ranges are not specified
+ if (option.getRanges().isEmpty())
+ {
+ if (option.isPrimaryRange())
+ {
+ // when repairing only primary range, neither dataCenters nor
hosts can be set
+ if (option.getDataCenters().isEmpty() &&
option.getHosts().isEmpty())
+ option.getRanges().addAll(getPrimaryRanges(keyspace));
+ // except dataCenters only contain local DC (i.e. -local)
+ else if (option.getDataCenters().size() == 1 &&
option.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter()))
+
option.getRanges().addAll(getPrimaryRangesWithinDC(keyspace));
+ else
+ throw new IllegalArgumentException("You need to run
primary range repair on all nodes in the cluster.");
+ }
+ else
+ {
+ option.getRanges().addAll(getLocalRanges(keyspace));
+ }
+ }
+ return forceRepairAsync(keyspace, option);
+ }
+
+ @Deprecated
+ public int forceRepairAsync(String keyspace,
+ boolean isSequential,
+ Collection<String> dataCenters,
+ Collection<String> hosts,
+ boolean primaryRange,
+ boolean fullRepair,
+ String... columnFamilies)
{
- return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts,
primaryRange, fullRepair, columnFamilies);
+ return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(),
dataCenters, hosts, primaryRange, fullRepair, columnFamilies);
}
- public int forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies)
+ @Deprecated
+ public int forceRepairAsync(String keyspace,
- RepairParallelism parallelismDegree,
++ int parallelismDegree,
+ Collection<String> dataCenters,
+ Collection<String> hosts,
+ boolean primaryRange,
+ boolean fullRepair,
+ String... columnFamilies)
{
- if (FBUtilities.isWindows() && parallelismDegree !=
RepairParallelism.PARALLEL)
+ if (parallelismDegree < 0 || parallelismDegree >
RepairParallelism.values().length - 1)
+ {
+ throw new IllegalArgumentException("Invalid parallelism degree
specified: " + parallelismDegree);
+ }
- Collection<Range<Token>> ranges;
- if (primaryRange)
- {
- // when repairing only primary range, neither dataCenters nor
hosts can be set
- if (dataCenters == null && hosts == null)
- ranges = getPrimaryRanges(keyspace);
- // except dataCenters only contain local DC (i.e. -local)
- else if (dataCenters != null && dataCenters.size() == 1 &&
dataCenters.contains(DatabaseDescriptor.getLocalDataCenter()))
- ranges = getPrimaryRangesWithinDC(keyspace);
- else
- throw new IllegalArgumentException("You need to run primary
range repair on all nodes in the cluster.");
- }
- else
++ RepairParallelism parallelism =
RepairParallelism.values()[parallelismDegree];
++ if (FBUtilities.isWindows() && parallelism !=
RepairParallelism.PARALLEL)
{
- ranges = getLocalRanges(keyspace);
+ logger.warn("Snapshot-based repair is not yet supported on
Windows. Reverting to parallel repair.");
- parallelismDegree = RepairParallelism.PARALLEL;
++ parallelism = RepairParallelism.PARALLEL;
}
- RepairOption options = new RepairOption(parallelismDegree,
primaryRange, !fullRepair, false, 1, Collections.<Range<Token>>emptyList());
- return forceRepairAsync(keyspace,
RepairParallelism.values()[parallelismDegree], dataCenters, hosts, ranges,
fullRepair, columnFamilies);
- }
-
- public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts,
Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
- {
- return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, dataCenters, hosts,
ranges, fullRepair, columnFamilies);
- }
-
- public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, Collection<String> dataCenters, Collection<String> hosts,
Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies)
- {
- if (ranges.isEmpty() ||
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
- return 0;
-
- int cmd = nextRepairCommand.incrementAndGet();
- if (ranges.size() > 0)
++ RepairOption options = new RepairOption(parallelism, primaryRange,
!fullRepair, false, 1, Collections.<Range<Token>>emptyList());
+ if (dataCenters != null)
{
- if (FBUtilities.isWindows() && parallelismDegree !=
RepairParallelism.PARALLEL)
- {
- logger.warn("Snapshot-based repair is not yet supported on
Windows. Reverting to parallel repair.");
- parallelismDegree = RepairParallelism.PARALLEL;
- }
- new Thread(createRepairTask(cmd, keyspace, ranges,
parallelismDegree, dataCenters, hosts, fullRepair, columnFamilies)).start();
+ options.getDataCenters().addAll(dataCenters);
}
- return cmd;
- }
-
- public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange, boolean fullRepair, String...
columnFamilies)
- {
- Collection<Range<Token>> ranges;
- if (primaryRange)
+ if (hosts != null)
{
- ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) :
getPrimaryRanges(keyspace);
+ options.getHosts().addAll(hosts);
}
- else
+ if (columnFamilies != null)
{
- ranges = getLocalRanges(keyspace);
+ for (String columnFamily : columnFamilies)
+ {
+ options.getColumnFamilies().add(columnFamily);
+ }
}
-
- return forceRepairAsync(keyspace, isSequential, isLocal, ranges,
fullRepair, columnFamilies);
+ return forceRepairAsync(keyspace, options);
}
- public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, Collection<Range<Token>> ranges, boolean fullRepair, String...
columnFamilies)
+ public int forceRepairAsync(String keyspace,
+ boolean isSequential,
+ boolean isLocal,
+ boolean primaryRange,
+ boolean fullRepair,
+ String... columnFamilies)
{
- return forceRepairAsync(keyspace, isSequential ?
RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL, isLocal, ranges,
fullRepair, columnFamilies);
- }
-
- public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, boolean isLocal, Collection<Range<Token>> ranges, boolean
fullRepair, String... columnFamilies)
- {
- if (ranges.isEmpty() ||
Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2)
- return 0;
-
- int cmd = nextRepairCommand.incrementAndGet();
- if (FBUtilities.isWindows() && parallelismDegree !=
RepairParallelism.PARALLEL)
+ Set<String> dataCenters = null;
+ if (isLocal)
{
- logger.warn("Snapshot-based repair is not yet supported on
Windows. Reverting to parallel repair.");
- parallelismDegree = RepairParallelism.PARALLEL;
+ dataCenters =
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
}
- new Thread(createRepairTask(cmd, keyspace, ranges, parallelismDegree,
isLocal, fullRepair, columnFamilies)).start();
- return cmd;
+ return forceRepairAsync(keyspace, isSequential, dataCenters, null,
primaryRange, fullRepair, columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws
IOException
+ public int forceRepairRangeAsync(String beginToken,
+ String endToken,
+ String keyspaceName,
+ boolean isSequential,
+ Collection<String> dataCenters,
+ Collection<String> hosts,
+ boolean fullRepair,
+ String... columnFamilies)
{
- return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential ? RepairParallelism.SEQUENTIAL : RepairParallelism.PARALLEL,
dataCenters, hosts, fullRepair, columnFamilies);
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential ? RepairParallelism.SEQUENTIAL.ordinal() :
RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair,
columnFamilies);
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken,
+ String endToken,
+ String keyspaceName,
- RepairParallelism parallelismDegree,
++ int parallelismDegree,
+ Collection<String> dataCenters,
+ Collection<String> hosts,
+ boolean fullRepair,
+ String... columnFamilies)
{
- if (FBUtilities.isWindows() && parallelismDegree !=
RepairParallelism.PARALLEL)
+ if (parallelismDegree < 0 || parallelismDegree >
RepairParallelism.values().length - 1)
+ {
+ throw new IllegalArgumentException("Invalid parallelism degree
specified: " + parallelismDegree);
+ }
++ RepairParallelism parallelism =
RepairParallelism.values()[parallelismDegree];
++ if (FBUtilities.isWindows() && parallelism !=
RepairParallelism.PARALLEL)
+ {
+ logger.warn("Snapshot-based repair is not yet supported on
Windows. Reverting to parallel repair.");
- parallelismDegree = RepairParallelism.PARALLEL;
++ parallelism = RepairParallelism.PARALLEL;
+ }
Collection<Range<Token>> repairingRange =
createRepairRangeFrom(beginToken, endToken);
- RepairOption options = new RepairOption(parallelismDegree, false,
!fullRepair, false, 1, repairingRange);
- logger.info("starting user-requested repair of range {} for keyspace
{} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
++ RepairOption options = new RepairOption(parallelism, false,
!fullRepair, false, 1, repairingRange);
+ options.getDataCenters().addAll(dataCenters);
+ if (hosts != null)
+ {
+ options.getHosts().addAll(hosts);
+ }
+ if (columnFamilies != null)
+ {
+ for (String columnFamily : columnFamilies)
+ {
+ options.getColumnFamilies().add(columnFamily);
+ }
+ }
- RepairParallelism parallelism =
RepairParallelism.values()[parallelismDegree];
- return forceRepairAsync(keyspaceName, parallelism, dataCenters,
hosts, repairingRange, fullRepair, columnFamilies);
+ logger.info("starting user-requested repair of range {} for keyspace
{} and column families {}",
+ repairingRange, keyspaceName, columnFamilies);
+ return forceRepairAsync(keyspaceName, options);
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair,
String... columnFamilies)
+ public int forceRepairRangeAsync(String beginToken,
+ String endToken,
+ String keyspaceName,
+ boolean isSequential,
+ boolean isLocal,
+ boolean fullRepair,
+ String... columnFamilies)
{
- Collection<Range<Token>> repairingRange =
createRepairRangeFrom(beginToken, endToken);
-
- logger.info("starting user-requested repair of range {} for keyspace
{} and column families {}",
- repairingRange, keyspaceName, columnFamilies);
- return forceRepairAsync(keyspaceName, isSequential, isLocal,
repairingRange, fullRepair, columnFamilies);
+ Set<String> dataCenters = null;
+ if (isLocal)
+ {
+ dataCenters =
Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter());
+ }
+ return forceRepairRangeAsync(beginToken, endToken, keyspaceName,
isSequential, dataCenters, null, fullRepair, columnFamilies);
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/9529fba6/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 70b2b81,e0441fb..007ef31
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -32,8 -30,8 +30,6 @@@ import java.util.concurrent.TimeoutExce
import javax.management.NotificationEmitter;
import javax.management.openmbean.TabularData;
--import org.apache.cassandra.db.compaction.CompactionManager;
--
public interface StorageServiceMBean extends NotificationEmitter
{
/**
@@@ -272,28 -270,48 +268,43 @@@
* type: "repair"
* userObject: int array of length 2, [0]=command number, [1]=ordinal
of AntiEntropyService.Status
*
+ * @param keyspace Keyspace name to repair. Should not be null.
+ * @param options repair option.
* @return Repair command number, or 0 if nothing to repair
*/
+ public int repairAsync(String keyspace, Map<String, String> options);
+
+ @Deprecated
public int forceRepairAsync(String keyspace, boolean isSequential,
Collection<String> dataCenters, Collection<String> hosts, boolean
primaryRange, boolean repairedAt, String... columnFamilies) throws IOException;
+ /**
+ * Invoke repair asynchronously.
+ * You can track repair progress by subscribing JMX notification sent
from this StorageServiceMBean.
+ * Notification format is:
+ * type: "repair"
+ * userObject: int array of length 2, [0]=command number, [1]=ordinal
of AntiEntropyService.Status
+ *
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
+ * @return Repair command number, or 0 if nothing to repair
+ */
+ @Deprecated
- public int forceRepairAsync(String keyspace, RepairParallelism
parallelismDegree, Collection<String> dataCenters, Collection<String> hosts,
boolean primaryRange, boolean fullRepair, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, int parallelismDegree,
Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange,
boolean fullRepair, String... columnFamilies);
- /**
- * Same as forceRepairAsync, but handles a specified range
- */
+ @Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, Collection<String> dataCenters,
Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws
IOException;
+ /**
+ * Same as forceRepairAsync, but handles a specified range
+ *
+ * @param parallelismDegree 0: sequential, 1: parallel, 2: DC parallel
+ */
+ @Deprecated
- public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, RepairParallelism parallelismDegree, Collection<String>
dataCenters, Collection<String> hosts, boolean fullRepair, String...
columnFamilies);
+ public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, int parallelismDegree, Collection<String> dataCenters,
Collection<String> hosts, boolean fullRepair, String... columnFamilies);
- /**
- * Invoke repair asynchronously.
- * You can track repair progress by subscribing JMX notification sent
from this StorageServiceMBean.
- * Notification format is:
- * type: "repair"
- * userObject: int array of length 2, [0]=command number, [1]=ordinal
of AntiEntropyService.Status
- *
- * @return Repair command number, or 0 if nothing to repair
- */
+ @Deprecated
public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange, boolean fullRepair, String...
columnFamilies);
- /**
- * Same as forceRepairAsync, but handles a specified range
- */
+ @Deprecated
public int forceRepairRangeAsync(String beginToken, String endToken,
String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt,
String... columnFamilies);
public void forceTerminateAllRepairSessions();