Repository: cassandra Updated Branches: refs/heads/trunk 6e66cf5f2 -> 5b88a0f62
Improve nodetool cleanup/scrub/upgradesstables failure handling. Patch by marcuse, reviewed by jbellis for CASSANDRA-6774 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/17945ab0 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/17945ab0 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/17945ab0 Branch: refs/heads/trunk Commit: 17945ab0d0bd65d35f99e4577f5c516aa60423f1 Parents: 35b2151 Author: Marcus Eriksson <marc...@apache.org> Authored: Tue Mar 18 09:38:14 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Tue Mar 18 09:46:26 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/ColumnFamilyStore.java | 17 +-- .../db/compaction/CompactionManager.java | 105 ++++++++++++++----- .../SizeTieredCompactionStrategy.java | 2 +- .../cassandra/service/StorageService.java | 30 ++++-- .../cassandra/service/StorageServiceMBean.java | 8 +- .../org/apache/cassandra/tools/NodeProbe.java | 46 ++++++-- .../org/apache/cassandra/tools/NodeTool.java | 6 +- 8 files changed, 161 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 04e16a0..384d995 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -22,6 +22,7 @@ * Fix stress smart Thrift client to pick servers correctly (CASSANDRA-6848) * Add logging levels (minimal, normal or verbose) to stress tool (CASSANDRA-6849) * Fix race condition in Batch CLE (CASSANDRA-6860) + * Improve cleanup/scrub/upgradesstables failure handling (CASSANDRA-6774) Merged from 2.0: * Update hadoop_cql3_word_count example (CASSANDRA-6793) * Fix handling of RejectedExecution in sync Thrift server (CASSANDRA-6788) http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 40632b2..e116574 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -1294,22 +1294,22 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean return maxFile; } - public void forceCleanup() throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceCleanup() throws ExecutionException, InterruptedException { - CompactionManager.instance.performCleanup(ColumnFamilyStore.this); + return CompactionManager.instance.performCleanup(ColumnFamilyStore.this); } - public void scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted) throws ExecutionException, InterruptedException { // skip snapshot creation during scrub, SEE JIRA 5891 if(!disableSnapshot) snapshotWithoutFlush("pre-scrub-" + System.currentTimeMillis()); - CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted); + return CompactionManager.instance.performScrub(ColumnFamilyStore.this, skipCorrupted); } - public void sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus sstablesRewrite(boolean excludeCurrentVersion) throws ExecutionException, InterruptedException { - CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); + return CompactionManager.instance.performSSTableRewrite(ColumnFamilyStore.this, excludeCurrentVersion); } public void markObsolete(Collection<SSTableReader> sstables, OperationType compactionType) @@ -2414,7 +2414,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { if (!cfs.getDataTracker().getCompacting().isEmpty()) { - logger.warn("Unable to cancel in-progress compactions for {}. Probably there is an unusually large row in progress somewhere. It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName); + logger.warn("Unable to cancel in-progress compactions for {}. Perhaps there is an unusually large row in progress somewhere, or the system is simply overloaded.", metadata.cfName); + return null; } } logger.debug("Compactions successfully cancelled"); @@ -2446,7 +2447,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean assert data.getCompacting().isEmpty() : data.getCompacting(); Iterable<SSTableReader> sstables = Lists.newArrayList(AbstractCompactionStrategy.filterSuspectSSTables(getSSTables())); if (Iterables.isEmpty(sstables)) - return null; + return Collections.emptyList(); boolean success = data.markCompacting(sstables); assert success : "something marked things compacting while compactions are disabled"; return sstables; http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index d44ff46..e28cfef 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -209,45 +209,80 @@ public class CompactionManager implements CompactionManagerMBean } } - private static interface AllSSTablesOperation + private abstract static class UnmarkingRunnable implements Runnable { - public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException; + private final ColumnFamilyStore cfs; + private final Iterable<SSTableReader> sstables; + + private UnmarkingRunnable(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables) + { + this.cfs = cfs; + this.sstables = sstables; + } + + protected abstract void runMayThrow() throws IOException; + + public final void run() + { + try + { + runMayThrow(); + } + catch (Exception e) + { + throw Throwables.propagate(e); + } + finally + { + cfs.getDataTracker().unmarkCompacting(sstables); + } + } } - private void performAllSSTableOperation(final ColumnFamilyStore cfs, final AllSSTablesOperation operation) throws InterruptedException, ExecutionException + public enum AllSSTableOpStatus { ABORTED, SUCCESSFUL } + + public AllSSTableOpStatus performScrub(final ColumnFamilyStore cfs, final boolean skipCorrupted) throws InterruptedException, ExecutionException { final Iterable<SSTableReader> sstables = cfs.markAllCompacting(); if (sstables == null) - return; + { + logger.info("Aborting scrub of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.ABORTED; + } + if (Iterables.isEmpty(sstables)) + { + logger.info("No sstables to scrub for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } - Callable<Object> runnable = new Callable<Object>() + Runnable runnable = new UnmarkingRunnable(cfs, sstables) { - public Object call() throws IOException + protected void runMayThrow() throws IOException { - operation.perform(cfs, sstables); - cfs.getDataTracker().unmarkCompacting(sstables); - return this; + doScrub(cfs, sstables, skipCorrupted); } }; executor.submit(runnable).get(); + return AllSSTableOpStatus.SUCCESSFUL; } - public void performScrub(ColumnFamilyStore cfStore, final boolean skipCorrupted) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performSSTableRewrite(final ColumnFamilyStore cfs, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException { - performAllSSTableOperation(cfStore, new AllSSTablesOperation() + final Iterable<SSTableReader> sstables = cfs.markAllCompacting(); + if (sstables == null) { - public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException - { - doScrub(store, sstables, skipCorrupted); - } - }); - } + logger.info("Aborting sstable format upgrade of {}.{} after failing to interrupt other compaction operations", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.ABORTED; + } + if (Iterables.isEmpty(sstables)) + { + logger.info("No sstables to upgrade for {}.{}", cfs.keyspace.getName(), cfs.name); + return AllSSTableOpStatus.SUCCESSFUL; + } - public void performSSTableRewrite(ColumnFamilyStore cfStore, final boolean excludeCurrentVersion) throws InterruptedException, ExecutionException - { - performAllSSTableOperation(cfStore, new AllSSTablesOperation() + Runnable runnable = new UnmarkingRunnable(cfs, sstables) { - public void perform(ColumnFamilyStore cfs, Iterable<SSTableReader> sstables) + protected void runMayThrow() throws IOException { for (final SSTableReader sstable : sstables) { @@ -262,23 +297,39 @@ public class CompactionManager implements CompactionManagerMBean task.execute(metrics); } } - }); + }; + executor.submit(runnable).get(); + return AllSSTableOpStatus.SUCCESSFUL; } - public void performCleanup(ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException + public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException { - performAllSSTableOperation(cfStore, new AllSSTablesOperation() + final Iterable<SSTableReader> sstables = cfStore.markAllCompacting(); + if (sstables == null) + { + logger.info("Aborting cleanup of {}.{} after failing to interrupt other compaction operations", cfStore.keyspace.getName(), cfStore.name); + return AllSSTableOpStatus.ABORTED; + } + if (Iterables.isEmpty(sstables)) { - public void perform(ColumnFamilyStore store, Iterable<SSTableReader> sstables) throws IOException + logger.info("No sstables to cleanup for {}.{}", cfStore.keyspace.getName(), cfStore.name); + return AllSSTableOpStatus.SUCCESSFUL; + } + + Runnable runnable = new UnmarkingRunnable(cfStore, sstables) + { + protected void runMayThrow() throws IOException { // Sort the column families in order of SSTable size, so cleanup of smaller CFs // can free up space for larger ones List<SSTableReader> sortedSSTables = Lists.newArrayList(sstables); Collections.sort(sortedSSTables, new SSTableReader.SizeComparator()); - doCleanupCompaction(store, sortedSSTables); + doCleanupCompaction(cfStore, sortedSSTables); } - }); + }; + executor.submit(runnable).get(); + return AllSSTableOpStatus.SUCCESSFUL; } public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs, http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index 63d983c..763d20b 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -277,7 +277,7 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy public Collection<AbstractCompactionTask> getMaximalTask(final int gcBefore) { Iterable<SSTableReader> allSSTables = cfs.markAllCompacting(); - if (allSSTables == null) + if (allSSTables == null || Iterables.isEmpty(allSSTables)) return null; Set<SSTableReader> sstables = Sets.newHashSet(allSSTables); Set<SSTableReader> repaired = new HashSet<>(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d9026d0..6d49a8f 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -46,6 +46,7 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.Uninterruptibles; import org.apache.commons.lang3.StringUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,6 +60,7 @@ import org.apache.cassandra.config.KSMetaData; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; @@ -2166,27 +2168,43 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return Gossiper.instance.getCurrentGenerationNumber(FBUtilities.getBroadcastAddress()); } - public void forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { if (keyspaceName.equals(Keyspace.SYSTEM_KS)) throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise"); + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) { - cfStore.forceCleanup(); + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.forceCleanup(); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; } + return status; } - public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, columnFamilies)) - cfStore.scrub(disableSnapshot, skipCorrupted); + { + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.scrub(disableSnapshot, skipCorrupted); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status; } - public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { + CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL; for (ColumnFamilyStore cfStore : getValidColumnFamilies(true, true, keyspaceName, columnFamilies)) - cfStore.sstablesRewrite(excludeCurrentVersion); + { + CompactionManager.AllSSTableOpStatus oneStatus = cfStore.sstablesRewrite(excludeCurrentVersion); + if (oneStatus != CompactionManager.AllSSTableOpStatus.SUCCESSFUL) + status = oneStatus; + } + return status; } public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index a036ba7..efe7b06 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeoutException; import javax.management.NotificationEmitter; import javax.management.openmbean.TabularData; +import org.apache.cassandra.db.compaction.CompactionManager; + public interface StorageServiceMBean extends NotificationEmitter { /** @@ -236,7 +238,7 @@ public interface StorageServiceMBean extends NotificationEmitter /** * Trigger a cleanup of keys on a single keyspace */ - public void forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public CompactionManager.AllSSTableOpStatus forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Scrub (deserialize + reserialize at the latest version, skipping bad rows if any) the given keyspace. @@ -244,13 +246,13 @@ public interface StorageServiceMBean extends NotificationEmitter * * Scrubbed CFs will be snapshotted first, if disableSnapshot is false */ - public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Rewrite all sstables to the latest version. * Unlike scrub, it doesn't skip bad rows and do not snapshot sstables first. */ - public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; + public CompactionManager.AllSSTableOpStatus upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException; /** * Flush all memtables for the given column families, or all columnfamilies for the given keyspace http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 477c333..5cca56e 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -186,21 +186,55 @@ public class NodeProbe implements AutoCloseable jmxc.close(); } - public void forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus forceKeyspaceCleanup(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); + return ssProxy.forceKeyspaceCleanup(keyspaceName, columnFamilies); } - public void scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus scrub(boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies); + return ssProxy.scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies); } - public void upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + public CompactionManager.AllSSTableOpStatus upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { - ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); + return ssProxy.upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies); } + public void forceKeyspaceCleanup(PrintStream out, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + switch (forceKeyspaceCleanup(keyspaceName, columnFamilies)) + { + case ABORTED: + failed = true; + out.println("Aborted cleaning up atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + break; + } + } + + public void scrub(PrintStream out, boolean disableSnapshot, boolean skipCorrupted, String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + switch (scrub(disableSnapshot, skipCorrupted, keyspaceName, columnFamilies)) + { + case ABORTED: + failed = true; + out.println("Aborted scrubbing atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + break; + } + } + + public void upgradeSSTables(PrintStream out, String keyspaceName, boolean excludeCurrentVersion, String... columnFamilies) throws IOException, ExecutionException, InterruptedException + { + switch (upgradeSSTables(keyspaceName, excludeCurrentVersion, columnFamilies)) + { + case ABORTED: + failed = true; + out.println("Aborted upgrading sstables for atleast one column family in keyspace "+keyspaceName+", check server logs for more information."); + break; + } + } + + public void forceKeyspaceCompaction(String keyspaceName, String... columnFamilies) throws IOException, ExecutionException, InterruptedException { ssProxy.forceKeyspaceCompaction(keyspaceName, columnFamilies); http://git-wip-us.apache.org/repos/asf/cassandra/blob/17945ab0/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index f387134..0bfac94 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -904,7 +904,7 @@ public class NodeTool try { - probe.forceKeyspaceCleanup(keyspace, cfnames); + probe.forceKeyspaceCleanup(System.out, keyspace, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during cleanup", e); @@ -1025,7 +1025,7 @@ public class NodeTool { try { - probe.scrub(disableSnapshot, skipCorrupted, keyspace, cfnames); + probe.scrub(System.out, disableSnapshot, skipCorrupted, keyspace, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during flushing", e); @@ -1103,7 +1103,7 @@ public class NodeTool { try { - probe.upgradeSSTables(keyspace, !includeAll, cfnames); + probe.upgradeSSTables(System.out, keyspace, !includeAll, cfnames); } catch (Exception e) { throw new RuntimeException("Error occurred during enabling auto-compaction", e);