Show progress on nodetool repair command; patch by yukim reviewed Sylvain Lebresne for CASSANDRA-4767
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0906b7cc Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0906b7cc Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0906b7cc Branch: refs/heads/cassandra-1.2 Commit: 0906b7cc5173770e04932432d40503f7c39eb61f Parents: 55f936f Author: Yuki Morishita <[email protected]> Authored: Tue Jan 8 11:20:25 2013 -0600 Committer: Yuki Morishita <[email protected]> Committed: Tue Jan 8 11:20:25 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/service/AntiEntropyService.java | 10 ++ .../apache/cassandra/service/StorageService.java | 89 ++++++++++++++- .../cassandra/service/StorageServiceMBean.java | 16 +++- src/java/org/apache/cassandra/tools/NodeCmd.java | 10 +- src/java/org/apache/cassandra/tools/NodeProbe.java | 74 ++++++++++++ 6 files changed, 191 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index c5c3863..5e87435 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -5,6 +5,7 @@ * fix specifying and altering crc_check_chance (CASSANDRA-5053) * Don't expire columns sooner than they should in 2ndary indexes (CASSANDRA-5079) * Pig: correctly decode row keys in widerow mode (CASSANDRA-5098) + * nodetool repair command now prints progress (CASSANDRA-4767) 1.1.8 http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/AntiEntropyService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AntiEntropyService.java b/src/java/org/apache/cassandra/service/AntiEntropyService.java index 0d7c1b4..dc03122 100644 --- a/src/java/org/apache/cassandra/service/AntiEntropyService.java +++ b/src/java/org/apache/cassandra/service/AntiEntropyService.java @@ -104,6 +104,11 @@ public class AntiEntropyService "internal"); } + public static enum Status + { + STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED + } + /** * A map of active session. */ @@ -646,6 +651,11 @@ public class AntiEntropyService return sessionName; } + public Range<Token> getRange() + { + return range; + } + RepairFuture getFuture() { return new RepairFuture(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a948786..ad05ce2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -29,7 +29,10 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.management.MBeanServer; +import javax.management.Notification; +import javax.management.NotificationBroadcasterSupport; import javax.management.ObjectName; import com.google.common.base.Supplier; @@ -78,12 +81,15 @@ import org.apache.cassandra.utils.WrappedRunnable; * This class will also maintain histograms of the load information * of other nodes in the cluster. */ -public class StorageService implements IEndpointStateChangeSubscriber, StorageServiceMBean +public class StorageService extends NotificationBroadcasterSupport implements IEndpointStateChangeSubscriber, StorageServiceMBean { private static Logger logger_ = LoggerFactory.getLogger(StorageService.class); public static final int RING_DELAY = getRingDelay(); // delay after which we assume ring has stablized + /* JMX notification serial number counter */ + private final AtomicLong notificationSerialNumber = new AtomicLong(); + /* All verb handler identifiers */ public enum Verb { @@ -244,6 +250,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe private static final AtomicInteger nextRepairCommand = new AtomicInteger(); + private final ObjectName jmxObjectName; + public void finishBootstrapping() { isBootstrapMode = false; @@ -265,7 +273,8 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); try { - mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=StorageService")); + jmxObjectName = new ObjectName("org.apache.cassandra.db:type=StorageService"); + mbs.registerMBean(this, jmxObjectName); } catch (Exception e) { @@ -1935,6 +1944,82 @@ public class StorageService implements IEndpointStateChangeSubscriber, StorageSe } /** + * Sends JMX notification to subscribers. + * + * @param type Message type + * @param message Message itself + * @param userObject Arbitrary object to attach to notification + */ + public void sendNotification(String type, String message, Object userObject) + { + Notification jmxNotification = new Notification(type, jmxObjectName, notificationSerialNumber.incrementAndGet(), message); + jmxNotification.setUserData(userObject); + sendNotification(jmxNotification); + } + + public int forceRepairAsync(final String tableName, final boolean isSequential, final boolean primaryRange, final String... columnFamilies) + { + if (Table.SYSTEM_TABLE.equals(tableName)) + return 0; + + final int cmd = nextRepairCommand.incrementAndGet(); + final Collection<Range<Token>> ranges = primaryRange ? Collections.singletonList(getLocalPrimaryRange()) : getLocalRanges(tableName); + if (ranges.size() > 0) + { + new Thread(new WrappedRunnable() + { + protected void runMayThrow() throws Exception + { + String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s", cmd, ranges.size(), tableName); + logger_.info(message); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.STARTED.ordinal()}); + + List<AntiEntropyService.RepairFuture> futures = new ArrayList<AntiEntropyService.RepairFuture>(ranges.size()); + for (Range<Token> range : ranges) + { + AntiEntropyService.RepairFuture future = forceTableRepair(range, tableName, isSequential, columnFamilies); + if (future == null) + continue; + futures.add(future); + // wait for a session to be done with its differencing before starting the next one + try + { + future.session.differencingDone.await(); + } + catch (InterruptedException e) + { + message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise."; + logger_.error(message, e); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + } + for (AntiEntropyService.RepairFuture future : futures) + { + try + { + future.get(); + message = String.format("Repair session %s for range %s finished", future.session.getName(), future.session.getRange().toString()); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_SUCCESS.ordinal()}); + } + catch (ExecutionException e) + { + message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getCause().getMessage()); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + catch (Exception e) + { + message = String.format("Repair session %s for range %s failed with error %s", future.session.getName(), future.session.getRange().toString(), e.getMessage()); + sendNotification("repair", message, new int[]{cmd, AntiEntropyService.Status.SESSION_FAILED.ordinal()}); + } + } + sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, AntiEntropyService.Status.FINISHED.ordinal()}); + } + }).start(); + } + return cmd; + } + + /** * Trigger proactive repair for a table and column families. * @param tableName * @param columnFamilies http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index c4c6a1d..c34faf3 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -26,12 +26,14 @@ import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import javax.management.NotificationEmitter; + import org.apache.cassandra.config.ConfigurationException; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.UnavailableException; -public interface StorageServiceMBean +public interface StorageServiceMBean extends NotificationEmitter { /** * Retrieve the list of live nodes in the cluster, where "liveness" is @@ -242,6 +244,18 @@ public interface StorageServiceMBean public void forceTableFlush(String tableName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** + * Invoke repair asynchronously. + * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. + * Notification format is: + * type: "repair" + * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status + * + * @return Repair command number, or 0 if nothing to repair + * @see #forceTableRepair(String, boolean, String...) + */ + public int forceRepairAsync(String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies); + + /** * Triggers proactive repair for given column families, or all columnfamilies for the given table * if none are explicitly listed. * @param tableName http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeCmd.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeCmd.java b/src/java/org/apache/cassandra/tools/NodeCmd.java index bba96de..8d4f9a1 100644 --- a/src/java/org/apache/cassandra/tools/NodeCmd.java +++ b/src/java/org/apache/cassandra/tools/NodeCmd.java @@ -32,8 +32,6 @@ import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; -import org.apache.cassandra.service.CacheServiceMBean; -import org.apache.cassandra.service.StorageProxyMBean; import org.apache.commons.cli.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; @@ -42,6 +40,8 @@ import org.apache.cassandra.db.ColumnFamilyStoreMBean; import org.apache.cassandra.db.compaction.CompactionManagerMBean; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.net.MessagingServiceMBean; +import org.apache.cassandra.service.CacheServiceMBean; +import org.apache.cassandra.service.StorageProxyMBean; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.utils.EstimatedHistogram; import org.apache.cassandra.utils.Pair; @@ -1040,10 +1040,8 @@ public class NodeCmd { case REPAIR : boolean snapshot = cmd.hasOption(SNAPSHOT_REPAIR_OPT.left); - if (cmd.hasOption(PRIMARY_RANGE_OPT.left)) - probe.forceTableRepairPrimaryRange(keyspace, snapshot, columnFamilies); - else - probe.forceTableRepair(keyspace, snapshot, columnFamilies); + boolean primaryRange = cmd.hasOption(PRIMARY_RANGE_OPT.left); + probe.forceRepairAsync(System.out, keyspace, snapshot, primaryRange, columnFamilies); break; case FLUSH : try { probe.forceTableFlush(keyspace, columnFamilies); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0906b7cc/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 3ed4524..264ea90 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -26,10 +26,12 @@ import java.lang.management.MemoryUsage; import java.lang.management.RuntimeMXBean; import java.net.InetAddress; import java.net.UnknownHostException; +import java.text.SimpleDateFormat; import java.util.*; import java.util.Map.Entry; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; import javax.management.*; import javax.management.remote.JMXConnector; import javax.management.remote.JMXConnectorFactory; @@ -52,6 +54,7 @@ import org.apache.cassandra.streaming.StreamingService; import org.apache.cassandra.streaming.StreamingServiceMBean; import org.apache.cassandra.thrift.InvalidRequestException; import org.apache.cassandra.thrift.UnavailableException; +import org.apache.cassandra.utils.SimpleCondition; /** * JMX client operations for Cassandra. @@ -204,6 +207,28 @@ public class NodeProbe ssProxy.forceTableRepair(tableName, isSequential, columnFamilies); } + public void forceRepairAsync(final PrintStream out, final String tableName, boolean isSequential, boolean primaryRange, String... columnFamilies) throws IOException + { + RepairRunner runner = new RepairRunner(out, tableName, columnFamilies); + try + { + ssProxy.addNotificationListener(runner, null, null); + runner.repairAndWait(ssProxy, isSequential, primaryRange); + } + catch (Exception e) + { + throw new IOException(e) ; + } + finally + { + try + { + ssProxy.removeNotificationListener(runner); + } + catch (ListenerNotFoundException ignored) {} + } + } + public void forceTableRepairPrimaryRange(String tableName, boolean isSequential, String... columnFamilies) throws IOException { ssProxy.forceTableRepairPrimaryRange(tableName, isSequential, columnFamilies); @@ -770,3 +795,52 @@ class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnab throw new UnsupportedOperationException(); } } + +class RepairRunner implements NotificationListener +{ + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + private final Condition condition = new SimpleCondition(); + private final PrintStream out; + private final String keyspace; + private final String[] columnFamilies; + private int cmd; + + RepairRunner(PrintStream out, String keyspace, String... columnFamilies) + { + this.out = out; + this.keyspace = keyspace; + this.columnFamilies = columnFamilies; + } + + public void repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, boolean primaryRangeOnly) throws InterruptedException + { + cmd = ssProxy.forceRepairAsync(keyspace, isSequential, primaryRangeOnly, columnFamilies); + if (cmd > 0) + { + condition.await(); + } + else + { + String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); + out.println(message); + } + } + + public void handleNotification(Notification notification, Object handback) + { + if ("repair".equals(notification.getType())) + { + // repair status is int array with [0] = cmd number, [1] = status + int[] status = (int[]) notification.getUserData(); + assert status.length == 2; + // we only output what we invoked + if (cmd == status[0]) + { + String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage()); + out.println(message); + if (status[1] == AntiEntropyService.Status.FINISHED.ordinal()) + condition.signalAll(); + } + } + } +}
