drain should flush system CFs too patch by jbellis; reviewed by yukim for CASSANDRA-4446
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1fac06a8 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1fac06a8 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1fac06a8 Branch: refs/heads/cassandra-1.2 Commit: 1fac06a848013863f47067f8ff7f769a2a08e276 Parents: ec21288 Author: Jonathan Ellis <[email protected]> Authored: Mon Jan 14 15:34:52 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Mon Jan 14 15:34:52 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Table.java | 26 ++++++++---- .../apache/cassandra/service/StorageService.java | 31 +++++++++++--- 3 files changed, 43 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 64cc60c..2e8d2c9 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 1.2.1 + * drain should flush system CFs too (CASSANDRA-4446) * add inter_dc_tcp_nodelay setting (CASSANDRA-5148) * re-allow wrapping ranges for start_token/end_token range pairing (CASSANDRA-5106) * fix validation compaction of empty rows (CASSADRA-5136) http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/src/java/org/apache/cassandra/db/Table.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Table.java b/src/java/org/apache/cassandra/db/Table.java index bb5e6ee..d923081 100644 --- a/src/java/org/apache/cassandra/db/Table.java +++ b/src/java/org/apache/cassandra/db/Table.java @@ -75,6 +75,13 @@ public class Table private final ConcurrentMap<UUID, ColumnFamilyStore> columnFamilyStores = new ConcurrentHashMap<UUID, ColumnFamilyStore>(); private final Object[] indexLocks; private volatile AbstractReplicationStrategy replicationStrategy; + public static final Function<String,Table> tableTransformer = new Function<String, Table>() + { + public Table apply(String tableName) + { + return Table.open(tableName); + } + }; public static Table open(String table) { @@ -456,14 +463,17 @@ public class Table public static Iterable<Table> all() { - Function<String, Table> transformer = new Function<String, Table>() - { - public Table apply(String tableName) - { - return Table.open(tableName); - } - }; - return Iterables.transform(Schema.instance.getTables(), transformer); + return Iterables.transform(Schema.instance.getTables(), tableTransformer); + } + + public static Iterable<Table> nonSystem() + { + return Iterables.transform(Schema.instance.getNonSystemTables(), tableTransformer); + } + + public static Iterable<Table> system() + { + return Iterables.transform(Schema.systemKeyspaceNames, tableTransformer); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/1fac06a8/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 15339c4..efa7487 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -3193,18 +3193,35 @@ public class StorageService extends NotificationBroadcasterSupport implements IE StorageProxy.instance.verifyNoHintsInProgress(); setMode(Mode.DRAINING, "flushing column families", false); - List<ColumnFamilyStore> cfses = new ArrayList<ColumnFamilyStore>(); - for (String tableName : Schema.instance.getNonSystemTables()) + // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty + totalCFs = 0; + for (Table table : Table.nonSystem()) + totalCFs += table.getColumnFamilyStores().size(); + remainingCFs = totalCFs; + // flush + List<Future<?>> flushes = new ArrayList<Future<?>>(); + for (Table table : Table.nonSystem()) { - Table table = Table.open(tableName); - cfses.addAll(table.getColumnFamilyStores()); + for (ColumnFamilyStore cfs : table.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); } - totalCFs = remainingCFs = cfses.size(); - for (ColumnFamilyStore cfs : cfses) + // wait for the flushes. + // TODO this is a godawful way to track progress, since they flush in parallel. a long one could + // thus make several short ones "instant" if we wait for them later. + for (Future f : flushes) { - cfs.forceBlockingFlush(); + FBUtilities.waitOnFuture(f); remainingCFs--; } + // flush the system ones after all the rest are done, just in case flushing modifies any system state + // like CASSANDRA-5151. don't bother with progress tracking since system data is tiny. + flushes.clear(); + for (Table table : Table.system()) + { + for (ColumnFamilyStore cfs : table.getColumnFamilyStores()) + flushes.add(cfs.forceFlush()); + } + FBUtilities.waitOnFutures(flushes); ColumnFamilyStore.postFlushExecutor.shutdown(); ColumnFamilyStore.postFlushExecutor.awaitTermination(60, TimeUnit.SECONDS);
