Merge branch 'cassandra-1.1' into cassandra-1.2
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/service/StorageService.java
src/java/org/apache/cassandra/tools/NodeCmd.java
src/java/org/apache/cassandra/tools/NodeProbe.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3e8bffd0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3e8bffd0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3e8bffd0
Branch: refs/heads/cassandra-1.2
Commit: 3e8bffd000926b4f2dc92fd2bd4c34f8570e9cc1
Parents: 068b53d 0074840
Author: Brandon Williams <[email protected]>
Authored: Fri Feb 22 16:31:26 2013 -0600
Committer: Brandon Williams <[email protected]>
Committed: Fri Feb 22 16:31:26 2013 -0600
----------------------------------------------------------------------
CHANGES.txt | 13 +++++-
.../cassandra/service/AntiEntropyService.java | 2 +-
.../apache/cassandra/service/StorageService.java | 31 +++++++++++-
.../cassandra/service/StorageServiceMBean.java | 5 ++
src/java/org/apache/cassandra/tools/NodeCmd.java | 9 +++-
src/java/org/apache/cassandra/tools/NodeProbe.java | 38 +++++++++++++++
6 files changed, 92 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index 8e77bf7,1fe1160..42a6e6d
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,83 -1,9 +1,94 @@@
-1.1.11
++1.2.3
++Merged from 1.1:
++=======
++ * nodetool: ability to repair specific range (CASSANDRA-5280)
++
+1.2.2
+ * fix potential for multiple concurrent compactions of the same sstables
+ (CASSANDRA-5256)
+ * avoid no-op caching of byte[] on commitlog append (CASSANDRA-5199)
+ * fix symlinks under data dir not working (CASSANDRA-5185)
+ * fix bug in compact storage metadata handling (CASSANDRA-5189)
+ * Validate login for USE queries (CASSANDRA-5207)
+ * cli: remove default username and password (CASSANDRA-5208)
+ * configure populate_io_cache_on_flush per-CF (CASSANDRA-4694)
+ * allow configuration of internode socket buffer (CASSANDRA-3378)
+ * Make sstable directory picking blacklist-aware again (CASSANDRA-5193)
+ * Correctly expire gossip states for edge cases (CASSANDRA-5216)
+ * Improve handling of directory creation failures (CASSANDRA-5196)
+ * Expose secondary indicies to the rest of nodetool (CASSANDRA-4464)
+ * Binary protocol: avoid sending notification for 0.0.0.0 (CASSANDRA-5227)
+ * add UseCondCardMark XX jvm settings on jdk 1.7 (CASSANDRA-4366)
+ * CQL3 refactor to allow conversion function (CASSANDRA-5226)
+ * Fix drop of sstables in some circumstance (CASSANDRA-5232)
+ * Implement caching of authorization results (CASSANDRA-4295)
+ * Add support for LZ4 compression (CASSANDRA-5038)
+ * Fix missing columns in wide rows queries (CASSANDRA-5225)
+ * Simplify auth setup and make system_auth ks alterable (CASSANDRA-5112)
+ * Stop compactions from hanging during bootstrap (CASSANDRA-5244)
+ * fix compressed streaming sending extra chunk (CASSANDRA-5105)
+ * Add CQL3-based implementations of IAuthenticator and IAuthorizer
+ (CASSANDRA-4898)
+ * Fix timestamp-based tomstone removal logic (CASSANDRA-5248)
* cli: Add JMX authentication support (CASSANDRA-5080)
+ * Fix forceFlush behavior (CASSANDRA-5241)
+ * cqlsh: Add username autocompletion (CASSANDRA-5231)
+ * Fix CQL3 composite partition key error (CASSANDRA-5240)
+ * Allow IN clause on last clustering key (CASSANDRA-5230)
+
+
+1.2.1
+ * stream undelivered hints on decommission (CASSANDRA-5128)
+ * GossipingPropertyFileSnitch loads saved dc/rack info if needed
(CASSANDRA-5133)
+ * drain should flush system CFs too (CASSANDRA-4446)
+ * add inter_dc_tcp_nodelay setting (CASSANDRA-5148)
+ * re-allow wrapping ranges for start_token/end_token range pairing
(CASSANDRA-5106)
+ * fix validation compaction of empty rows (CASSADRA-5136)
+ * nodetool methods to enable/disable hint storage/delivery (CASSANDRA-4750)
+ * disallow bloom filter false positive chance of 0 (CASSANDRA-5013)
+ * add threadpool size adjustment methods to JMXEnabledThreadPoolExecutor and
+ CompactionManagerMBean (CASSANDRA-5044)
+ * fix hinting for dropped local writes (CASSANDRA-4753)
+ * off-heap cache doesn't need mutable column container (CASSANDRA-5057)
+ * apply disk_failure_policy to bad disks on initial directory creation
+ (CASSANDRA-4847)
+ * Optimize name-based queries to use ArrayBackedSortedColumns
(CASSANDRA-5043)
+ * Fall back to old manifest if most recent is unparseable (CASSANDRA-5041)
+ * pool [Compressed]RandomAccessReader objects on the partitioned read path
+ (CASSANDRA-4942)
+ * Add debug logging to list filenames processed by Directories.migrateFile
+ method (CASSANDRA-4939)
+ * Expose black-listed directories via JMX (CASSANDRA-4848)
+ * Log compaction merge counts (CASSANDRA-4894)
+ * Minimize byte array allocation by AbstractData{Input,Output}
(CASSANDRA-5090)
+ * Add SSL support for the binary protocol (CASSANDRA-5031)
+ * Allow non-schema system ks modification for shuffle to work
(CASSANDRA-5097)
+ * cqlsh: Add default limit to SELECT statements (CASSANDRA-4972)
+ * cqlsh: fix DESCRIBE for 1.1 cfs in CQL3 (CASSANDRA-5101)
+ * Correctly gossip with nodes >= 1.1.7 (CASSANDRA-5102)
+ * Ensure CL guarantees on digest mismatch (CASSANDRA-5113)
+ * Validate correctly selects on composite partition key (CASSANDRA-5122)
+ * Fix exception when adding collection (CASSANDRA-5117)
+ * Handle states for non-vnode clusters correctly (CASSANDRA-5127)
+ * Refuse unrecognized replication and compaction strategy options
(CASSANDRA-4795)
+ * Pick the correct value validator in sstable2json for cql3 tables
(CASSANDRA-5134)
+ * Validate login for describe_keyspace, describe_keyspaces and set_keyspace
+ (CASSANDRA-5144)
+ * Fix inserting empty maps (CASSANDRA-5141)
+ * Don't remove tokens from System table for node we know (CASSANDRA-5121)
+ * fix streaming progress report for compresed files (CASSANDRA-5130)
+ * Coverage analysis for low-CL queries (CASSANDRA-4858)
+ * Stop interpreting dates as valid timeUUID value (CASSANDRA-4936)
+ * Adds E notation for floating point numbers (CASSANDRA-4927)
+ * Detect (and warn) unintentional use of the cql2 thrift methods when cql3
was
+ intended (CASSANDRA-5172)
- Merged from 1.1:
++
++
++1.1.11
+ * nodetool: ability to repair specific range (CASSANDRA-5280)
+
+
+ 1.1.10
* fix saved key cache not loading at startup (CASSANDRA-5166)
* fix ConcurrentModificationException in getBootstrapSource (CASSANDRA-5170)
* fix sstable maxtimestamp for row deletes and pre-1.1.1 sstables
(CASSANDRA-5153)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/AntiEntropyService.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageService.java
index 9ce4bf0,05401e0..11ae98b
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@@ -2306,21 -1851,95 +2306,36 @@@ public class StorageService extends Not
jmxNotification.setUserData(userObject);
sendNotification(jmxNotification);
}
--
- public int forceRepairAsync(final String tableName, final boolean
isSequential, final boolean primaryRange, final String... columnFamilies)
+ public int forceRepairAsync(final String keyspace, final boolean
isSequential, final boolean isLocal, final boolean primaryRange, final
String... columnFamilies)
{
- final Collection<Range<Token>> ranges = primaryRange ?
Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName);
- return forceRepairAsync(tableName, isSequential, ranges,
columnFamilies);
++ final Collection<Range<Token>> ranges = primaryRange ?
Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace);
++ return forceRepairAsync(keyspace, isSequential, isLocal, ranges,
columnFamilies);
+ }
+
- public int forceRepairAsync(final String tableName, final boolean
isSequential, final Collection<Range<Token>> ranges, final String...
columnFamilies)
++ public int forceRepairAsync(final String keyspace, final boolean
isSequential, final boolean isLocal, final Collection<Range<Token>> ranges,
final String... columnFamilies)
+ {
- if (Table.SYSTEM_TABLE.equals(tableName))
+ if (Table.SYSTEM_KS.equals(keyspace) ||
Tracing.TRACE_KS.equals(keyspace))
return 0;
final int cmd = nextRepairCommand.incrementAndGet();
- final Collection<Range<Token>> ranges = primaryRange ?
Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(keyspace);
if (ranges.size() > 0)
{
- new Thread(new WrappedRunnable()
- {
- protected void runMayThrow() throws Exception
- {
- String message = String.format("Starting repair command
#%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName);
- logger_.info(message);
- sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.STARTED.ordinal()});
-
- List<AntiEntropyService.RepairFuture> futures = new
ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
- for (Range<Token> range : ranges)
- {
- AntiEntropyService.RepairFuture future;
- try
- {
- future = forceTableRepair(range, tableName,
isSequential, columnFamilies);
- }
- catch (IllegalArgumentException e)
- {
- message = String.format("Repair session failed
with error: %s", e);
- sendNotification("repair", message, new
int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- continue;
- }
- if (future == null)
- continue;
- futures.add(future);
- // wait for a session to be done with its
differencing before starting the next one
- try
- {
- future.session.differencingDone.await();
- }
- catch (InterruptedException e)
- {
- message = "Interrupted while waiting for the
differencing of repair session " + future.session + " to be done. Repair may be
imprecise.";
- logger_.error(message, e);
- sendNotification("repair", message, new
int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- }
- for (AntiEntropyService.RepairFuture future : futures)
- {
- try
- {
- future.get();
- message = String.format("Repair session %s for
range %s finished", future.session.getName(),
future.session.getRange().toString());
- sendNotification("repair", message, new
int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
- }
- catch (ExecutionException e)
- {
- message = String.format("Repair session %s for
range %s failed with error %s", future.session.getName(),
future.session.getRange().toString(), e.getCause().getMessage());
- sendNotification("repair", message, new
int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- catch (Exception e)
- {
- message = String.format("Repair session %s for
range %s failed with error %s", future.session.getName(),
future.session.getRange().toString(), e.getMessage());
- sendNotification("repair", message, new
int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()});
- }
- }
- sendNotification("repair", String.format("Repair command
#%d finished", cmd), new int[]{cmd,
AntiEntropyService.Status.FINISHED.ordinal()});
- }
- }).start();
+ new Thread(createRepairTask(cmd, keyspace, ranges, isSequential,
isLocal, columnFamilies)).start();
}
return cmd;
}
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String tableName, boolean isSequential, final String... columnFamilies)
++ public int forceRepairRangeAsync(String beginToken, String endToken,
final String tableName, boolean isSequential, boolean isLocal, final String...
columnFamilies)
+ {
+ Token parsedBeginToken =
getPartitioner().getTokenFactory().fromString(beginToken);
+ Token parsedEndToken =
getPartitioner().getTokenFactory().fromString(endToken);
+
- logger_.info("starting user-requested repair of range ({}, {}] for
keyspace {} and column families {}",
++ logger.info("starting user-requested repair of range ({}, {}] for
keyspace {} and column families {}",
+ new Object[] {parsedBeginToken, parsedEndToken, tableName,
columnFamilies});
- return forceRepairAsync(tableName, isSequential,
Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)),
columnFamilies);
++ return forceRepairAsync(tableName, isSequential, isLocal,
Collections.singleton(new Range<Token>(parsedBeginToken, parsedEndToken)),
columnFamilies);
+ }
+
+
/**
* Trigger proactive repair for a table and column families.
* @param tableName
@@@ -2342,78 -2019,28 +2357,88 @@@
Token parsedBeginToken =
getPartitioner().getTokenFactory().fromString(beginToken);
Token parsedEndToken =
getPartitioner().getTokenFactory().fromString(endToken);
- logger_.info("starting user-requested repair of range ({}, {}] for
keyspace {} and column families {}",
- new Object[] {parsedBeginToken, parsedEndToken,
tableName, columnFamilies});
- AntiEntropyService.RepairFuture future = forceTableRepair(new
Range<Token>(parsedBeginToken, parsedEndToken), tableName, isSequential,
columnFamilies);
- if (future == null)
+ logger.info("starting user-requested repair of range ({}, {}] for
keyspace {} and column families {}",
+ parsedBeginToken, parsedEndToken, tableName,
columnFamilies);
+ forceTableRepairRange(tableName, Collections.singleton(new
Range<Token>(parsedBeginToken, parsedEndToken)), isSequential, isLocal,
columnFamilies);
+ }
+
+ public void forceTableRepairRange(final String tableName, final
Collection<Range<Token>> ranges, boolean isSequential, boolean isLocal, final
String... columnFamilies) throws IOException
+ {
+ if (Table.SYSTEM_KS.equals(tableName) ||
Tracing.TRACE_KS.equals(tableName))
return;
- try
- {
- future.get();
- }
- catch (Exception e)
+ createRepairTask(nextRepairCommand.incrementAndGet(), tableName,
ranges, isSequential, isLocal, columnFamilies).run();
+ }
+
+ private FutureTask<Object> createRepairTask(final int cmd, final String
keyspace, final Collection<Range<Token>> ranges, final boolean isSequential,
final boolean isLocal, final String... columnFamilies)
+ {
+ FutureTask<Object> task = new FutureTask<Object>(new WrappedRunnable()
{
- logger_.error("Repair session " + future.session.getName() + "
failed.", e);
- }
+ protected void runMayThrow() throws Exception
+ {
+ String message = String.format("Starting repair command #%d,
repairing %d ranges for keyspace %s", cmd, ranges.size(), keyspace);
+ logger.info(message);
+ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.STARTED.ordinal()});
+
+ List<AntiEntropyService.RepairFuture> futures = new
ArrayList<AntiEntropyService.RepairFuture>(ranges.size());
+ for (Range<Token> range : ranges)
+ {
- AntiEntropyService.RepairFuture future =
forceTableRepair(range, keyspace, isSequential, isLocal, columnFamilies);
++ AntiEntropyService.RepairFuture future;
++ try
++ {
++ future = forceTableRepair(range, keyspace,
isSequential, isLocal, columnFamilies);
++ }
++ catch (IllegalArgumentException e)
++ {
++ logger.error("Repair session failed:", e);
++ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.SESSION_FAILED.ordinal()});
++ continue;
++ }
+ if (future == null)
+ continue;
+ futures.add(future);
+ // wait for a session to be done with its differencing
before starting the next one
+ try
+ {
+ future.session.differencingDone.await();
+ }
+ catch (InterruptedException e)
+ {
+ message = "Interrupted while waiting for the
differencing of repair session " + future.session + " to be done. Repair may be
imprecise.";
+ logger.error(message, e);
+ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ }
+ for (AntiEntropyService.RepairFuture future : futures)
+ {
+ try
+ {
+ future.get();
+ message = String.format("Repair session %s for range
%s finished", future.session.getName(), future.session.getRange().toString());
+ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.SESSION_SUCCESS.ordinal()});
+ }
+ catch (ExecutionException e)
+ {
+ message = String.format("Repair session %s for range
%s failed with error %s", future.session.getName(),
future.session.getRange().toString(), e.getCause().getMessage());
+ logger.error(message, e);
+ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ catch (Exception e)
+ {
+ message = String.format("Repair session %s for range
%s failed with error %s", future.session.getName(),
future.session.getRange().toString(), e.getMessage());
+ logger.error(message, e);
+ sendNotification("repair", message, new int[]{cmd,
AntiEntropyService.Status.SESSION_FAILED.ordinal()});
+ }
+ }
+ sendNotification("repair", String.format("Repair command #%d
finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()});
+ }
+ }, null);
+ return task;
}
- public AntiEntropyService.RepairFuture forceTableRepair(final
Range<Token> range, final String tableName, boolean isSequential, final
String... columnFamilies) throws IOException
+ public AntiEntropyService.RepairFuture forceTableRepair(final
Range<Token> range, final String tableName, boolean isSequential, boolean
isLocal, final String... columnFamilies) throws IOException
{
ArrayList<String> names = new ArrayList<String>();
- for (ColumnFamilyStore cfStore : getValidColumnFamilies(tableName,
columnFamilies))
+ for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false,
tableName, columnFamilies))
{
names.add(cfStore.getColumnFamilyName());
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/service/StorageServiceMBean.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/StorageServiceMBean.java
index 067d08a,1261d2a..aa0881b
--- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java
@@@ -261,11 -251,16 +261,16 @@@ public interface StorageServiceMBean ex
* userObject: int array of length 2, [0]=command number, [1]=ordinal
of AntiEntropyService.Status
*
* @return Repair command number, or 0 if nothing to repair
- * @see #forceTableRepair(String, boolean, String...)
+ * @see #forceTableRepair(String, boolean, boolean, String...)
*/
- public int forceRepairAsync(String tableName, boolean isSequential,
boolean primaryRange, String... columnFamilies);
+ public int forceRepairAsync(String keyspace, boolean isSequential,
boolean isLocal, boolean primaryRange, String... columnFamilies);
/**
+ * Same as forceRepairAsync, but handles a specified range
+ */
- public int forceRepairRangeAsync(String beginToken, String endToken,
final String tableName, boolean isSequential, final String... columnFamilies);
++ public int forceRepairRangeAsync(String beginToken, String endToken,
final String tableName, boolean isSequential, boolean isLocal, final String...
columnFamilies);
+
+ /**
* Triggers proactive repair for given column families, or all
columnfamilies for the given table
* if none are explicitly listed.
* @param tableName
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeCmd.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeCmd.java
index b74ffb7,99cbab1..9b844b1
--- a/src/java/org/apache/cassandra/tools/NodeCmd.java
+++ b/src/java/org/apache/cassandra/tools/NodeCmd.java
@@@ -55,16 -48,16 +55,18 @@@ import org.apache.cassandra.utils.Pair
public class NodeCmd
{
- private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT = new
Pair<String, String>("cf", "column-family");
- private static final Pair<String, String> HOST_OPT = new Pair<String,
String>("h", "host");
- private static final Pair<String, String> PORT_OPT = new Pair<String,
String>("p", "port");
- private static final Pair<String, String> USERNAME_OPT = new Pair<String,
String>("u", "username");
- private static final Pair<String, String> PASSWORD_OPT = new Pair<String,
String>("pw", "password");
- private static final Pair<String, String> TAG_OPT = new Pair<String,
String>("t", "tag");
- private static final Pair<String, String> PRIMARY_RANGE_OPT = new
Pair<String, String>("pr", "partitioner-range");
- private static final Pair<String, String> START_TOKEN_OPT = new
Pair<String, String>("st", "start-token");
- private static final Pair<String, String> END_TOKEN_OPT = new
Pair<String, String>("et", "end-token");
- private static final Pair<String, String> SNAPSHOT_REPAIR_OPT = new
Pair<String, String>("snapshot", "with-snapshot");
+ private static final Pair<String, String> SNAPSHOT_COLUMNFAMILY_OPT =
Pair.create("cf", "column-family");
+ private static final Pair<String, String> HOST_OPT = Pair.create("h",
"host");
+ private static final Pair<String, String> PORT_OPT = Pair.create("p",
"port");
+ private static final Pair<String, String> USERNAME_OPT = Pair.create("u",
"username");
+ private static final Pair<String, String> PASSWORD_OPT =
Pair.create("pw", "password");
+ private static final Pair<String, String> TAG_OPT = Pair.create("t",
"tag");
+ private static final Pair<String, String> TOKENS_OPT = Pair.create("T",
"tokens");
+ private static final Pair<String, String> PRIMARY_RANGE_OPT =
Pair.create("pr", "partitioner-range");
+ private static final Pair<String, String> SNAPSHOT_REPAIR_OPT =
Pair.create("snapshot", "with-snapshot");
+ private static final Pair<String, String> LOCAL_DC_REPAIR_OPT =
Pair.create("local", "in-local-dc");
++ private static final Pair<String, String> START_TOKEN_OPT =
Pair.create("st", "start-token");
++ private static final Pair<String, String> END_TOKEN_OPT =
Pair.create("et", "end-token");
private static final String DEFAULT_HOST = "127.0.0.1";
private static final int DEFAULT_PORT = 7199;
@@@ -81,10 -76,10 +83,12 @@@
options.addOption(USERNAME_OPT, true, "remote jmx agent username");
options.addOption(PASSWORD_OPT, true, "remote jmx agent password");
options.addOption(TAG_OPT, true, "optional name to give a
snapshot");
+ options.addOption(TOKENS_OPT, false, "display all tokens");
options.addOption(PRIMARY_RANGE_OPT, false, "only repair the first
range returned by the partitioner for the node");
options.addOption(SNAPSHOT_REPAIR_OPT, false, "repair one node at a
time using snapshots");
+ options.addOption(LOCAL_DC_REPAIR_OPT, false, "only repair against
nodes in the same datacenter");
+ options.addOption(START_TOKEN_OPT, true, "token at which repair range
starts");
+ options.addOption(END_TOKEN_OPT, true, "token at which repair range
ends");
}
public NodeCmd(NodeProbe probe)
@@@ -1345,9 -1044,11 +1349,12 @@@
{
case REPAIR :
boolean snapshot =
cmd.hasOption(SNAPSHOT_REPAIR_OPT.left);
+ boolean localDC = cmd.hasOption(LOCAL_DC_REPAIR_OPT.left);
boolean primaryRange =
cmd.hasOption(PRIMARY_RANGE_OPT.left);
- probe.forceRepairAsync(System.out, keyspace, snapshot,
localDC, primaryRange, columnFamilies);
+ if (cmd.hasOption(START_TOKEN_OPT.left) ||
cmd.hasOption(END_TOKEN_OPT.left))
- probe.forceRepairRangeAsync(System.out, keyspace,
snapshot, cmd.getOptionValue(START_TOKEN_OPT.left),
cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
++ probe.forceRepairRangeAsync(System.out, keyspace,
snapshot, localDC, cmd.getOptionValue(START_TOKEN_OPT.left),
cmd.getOptionValue(END_TOKEN_OPT.left), columnFamilies);
+ else
- probe.forceRepairAsync(System.out, keyspace,
snapshot, primaryRange, columnFamilies);
++ probe.forceRepairAsync(System.out, keyspace,
snapshot, localDC, primaryRange, columnFamilies);
break;
case FLUSH :
try { probe.forceTableFlush(keyspace, columnFamilies); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3e8bffd0/src/java/org/apache/cassandra/tools/NodeProbe.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/tools/NodeProbe.java
index ed93359,44e64c4..ce4407f
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@@ -235,14 -231,37 +235,37 @@@ public class NodeProb
}
}
- public void forceRepairRangeAsync(final PrintStream out, final String
tableName, boolean isSequential, final String startToken, final String
endToken, String... columnFamilies) throws IOException
++ public void forceRepairRangeAsync(final PrintStream out, final String
tableName, boolean isSequential, boolean isLocal, final String startToken,
final String endToken, String... columnFamilies) throws IOException
+ {
+ RepairRunner runner = new RepairRunner(out, tableName,
columnFamilies);
+ try
+ {
+ ssProxy.addNotificationListener(runner, null, null);
- if (!runner.repairRangeAndWait(ssProxy, isSequential,
startToken, endToken))
++ if (!runner.repairRangeAndWait(ssProxy, isSequential, isLocal,
startToken, endToken))
+ failed = true;
+ }
+ catch (Exception e)
+ {
+ throw new IOException(e) ;
+ }
+ finally
+ {
+ try
+ {
+ ssProxy.removeNotificationListener(runner);
+ }
+ catch (ListenerNotFoundException ignored) {}
+ }
+ }
+
- public void forceTableRepairPrimaryRange(String tableName, boolean
isSequential, String... columnFamilies) throws IOException
+ public void forceTableRepairPrimaryRange(String tableName, boolean
isSequential, boolean isLocal, String... columnFamilies) throws IOException
{
- ssProxy.forceTableRepairPrimaryRange(tableName, isSequential,
columnFamilies);
+ ssProxy.forceTableRepairPrimaryRange(tableName, isSequential,
isLocal, columnFamilies);
}
- public void forceTableRepairRange(String beginToken, String endToken,
String tableName, boolean isSequential, String... columnFamilies) throws
IOException
+ public void forceTableRepairRange(String beginToken, String endToken,
String tableName, boolean isSequential, boolean isLocal, String...
columnFamilies) throws IOException
{
- ssProxy.forceTableRepairRange(beginToken, endToken, tableName,
isSequential, columnFamilies);
+ ssProxy.forceTableRepairRange(beginToken, endToken, tableName,
isSequential, isLocal, columnFamilies);
}
public void invalidateKeyCache() throws IOException
@@@ -929,6 -853,21 +952,21 @@@ class RepairRunner implements Notificat
else
{
String message = String.format("[%s] Nothing to repair for
keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
+ out.println(message);
+ }
+ return success;
+ }
+
- public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean
isSequential, String startToken, String endToken) throws InterruptedException
++ public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean
isSequential, boolean isLocal, String startToken, String endToken) throws
InterruptedException
+ {
- cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace,
isSequential, columnFamilies);
++ cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace,
isSequential, isLocal, columnFamilies);
+ if (cmd > 0)
+ {
+ condition.await();
+ }
+ else
+ {
+ String message = String.format("[%s] Nothing to repair for
keyspace '%s'", format.format(System.currentTimeMillis()), keyspace);
out.println(message);
}
return success;