merge from 1.2
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a7a7edea Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a7a7edea Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a7a7edea Branch: refs/heads/trunk Commit: a7a7edeaaaa48da69c036c6600799842b9bf99db Parents: 08167d6 2b1fb0f Author: Jonathan Ellis <[email protected]> Authored: Sat Nov 16 11:10:56 2013 -0600 Committer: Jonathan Ellis <[email protected]> Committed: Sat Nov 16 11:10:56 2013 -0600 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/ColumnFamilyStore.java | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a7edea/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index cd6895f,a438f15..57ad75d --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,31 -1,5 +1,32 @@@ -1.2.12 +2.0.3 + * Cancel read meter task when closing SSTR (CASSANDRA-6358) + * free off-heap IndexSummary during bulk (CASSANDRA-6359) + * Recover from IOException in accept() thread (CASSANDRA-6349) + * Improve Gossip tolerance of abnormally slow tasks (CASSANDRA-6338) + * Fix trying to hint timed out counter writes (CASSANDRA-6322) + * Allow restoring specific columnfamilies from archived CL (CASSANDRA-4809) + * Avoid flushing compaction_history after each operation (CASSANDRA-6287) + * Fix repair assertion error when tombstones expire (CASSANDRA-6277) + * Skip loading corrupt key cache (CASSANDRA-6260) + * Fixes for compacting larger-than-memory rows (CASSANDRA-6274) + * Compact hottest sstables first and optionally omit coldest from + compaction entirely (CASSANDRA-6109) + * Fix modifying column_metadata from thrift (CASSANDRA-6182) + * cqlsh: fix LIST USERS output (CASSANDRA-6242) + * Add IRequestSink interface (CASSANDRA-6248) + * Update memtable size while flushing (CASSANDRA-6249) + * Provide hooks around CQL2/CQL3 statement execution (CASSANDRA-6252) + * Require Permission.SELECT for CAS updates (CASSANDRA-6247) + * New CQL-aware SSTableWriter (CASSANDRA-5894) + * Reject CAS operation when the protocol v1 is used (CASSANDRA-6270) + * Correctly throw error when frame too large (CASSANDRA-5981) + * Fix serialization bug in PagedRange with 2ndary indexes (CASSANDRA-6299) + * Fix CQL3 table validation in Thrift (CASSANDRA-6140) + * Fix bug missing results with IN clauses (CASSANDRA-6327) + * Fix paging with reversed slices (CASSANDRA-6343) + * Set minTimestamp correctly to be able to drop expired sstables (CASSANDRA-6337) +Merged from 1.2: + * Invalidate row cache when dropping CF (CASSANDRA-6351) * add non-jamm path for cached statements (CASSANDRA-6293) * (Hadoop) Require CFRR batchSize to be at least 2 (CASSANDRA-6114) * Fix altering column types (CASSANDRA-6185) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a7a7edea/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 4346224,eaadbdd..137d597 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@@ -325,9 -294,15 +325,15 @@@ public class ColumnFamilyStore implemen valid = false; unregisterMBean(); - SystemTable.removeTruncationRecord(metadata.cfId); + SystemKeyspace.removeTruncationRecord(metadata.cfId); data.unreferenceSSTables(); indexManager.invalidate(); + + for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + { + if (key.cfId == metadata.cfId) + invalidateCachedRow(key); + } } catch (Exception e) { @@@ -1257,7 -1181,7 +1263,7 @@@ finally { if (sentinelSuccess && data == null) -- CacheService.instance.rowCache.remove(key); ++ invalidateCachedRow(key); } } @@@ -1927,109 -1766,33 +1933,109 @@@ // sleep a little to make sure that our truncatedAt comes after any sstable // that was part of the flushed we forced; otherwise on a tie, it won't get deleted. - try + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + } + + // nuke the memtable data w/o writing to disk first + Keyspace.switchLock.writeLock().lock(); + try + { + for (ColumnFamilyStore cfs : concatWithIndexes()) { - long starttime = System.currentTimeMillis(); - while ((System.currentTimeMillis() - starttime) < 1) - { - Thread.sleep(1); - } + Memtable mt = cfs.getMemtableThreadSafe(); + if (!mt.isClean()) + mt.cfs.data.renewMemtable(); } - catch (InterruptedException e) + } + finally + { + Keyspace.switchLock.writeLock().unlock(); + } + + Runnable truncateRunnable = new Runnable() + { + public void run() { - throw new AssertionError(e); + logger.debug("Discarding sstable data for truncated CF + indexes"); + + final long truncatedAt = System.currentTimeMillis(); + if (DatabaseDescriptor.isAutoSnapshot()) + snapshot(Keyspace.getTimestampedSnapshotName(name)); + + ReplayPosition replayAfter = discardSSTables(truncatedAt); + + for (SecondaryIndex index : indexManager.getIndexes()) + index.truncateBlocking(truncatedAt); + + SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, truncatedAt, replayAfter); + + logger.debug("cleaning out row cache"); + for (RowCacheKey key : CacheService.instance.rowCache.getKeySet()) + { + if (key.cfId == metadata.cfId) - CacheService.instance.rowCache.remove(key); ++ invalidateCachedRow(key); + } } - } - else + }; + + runWithCompactionsDisabled(Executors.callable(truncateRunnable), true); + logger.debug("truncate complete"); + } + + public <V> V runWithCompactionsDisabled(Callable<V> callable, boolean interruptValidation) + { + // synchronize so that concurrent invocations don't re-enable compactions partway through unexpectedly, + // and so we only run one major compaction at a time + synchronized (this) { - // just nuke the memtable data w/o writing to disk first - Table.switchLock.writeLock().lock(); + logger.debug("Cancelling in-progress compactions for {}", metadata.cfName); + + Iterable<ColumnFamilyStore> selfWithIndexes = concatWithIndexes(); + for (ColumnFamilyStore cfs : selfWithIndexes) + cfs.getCompactionStrategy().pause(); try { - for (ColumnFamilyStore cfs : concatWithIndexes()) + // interrupt in-progress compactions + Function<ColumnFamilyStore, CFMetaData> f = new Function<ColumnFamilyStore, CFMetaData>() { - Memtable mt = cfs.getMemtableThreadSafe(); - if (!mt.isClean()) + public CFMetaData apply(ColumnFamilyStore cfs) { - mt.cfs.data.renewMemtable(); + return cfs.metadata; + } + }; + Iterable<CFMetaData> allMetadata = Iterables.transform(selfWithIndexes, f); + CompactionManager.instance.interruptCompactionFor(allMetadata, interruptValidation); + + // wait for the interruption to be recognized + long start = System.nanoTime(); + long delay = TimeUnit.MINUTES.toNanos(1); + while (System.nanoTime() - start < delay) + { + if (CompactionManager.instance.isCompacting(selfWithIndexes)) + Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS); + else + break; + } + + // doublecheck that we finished, instead of timing out + for (ColumnFamilyStore cfs : selfWithIndexes) + { + if (!cfs.getDataTracker().getCompacting().isEmpty()) + { + logger.warn("Unable to cancel in-progress compactions for {}. Probably there is an unusually large row in progress somewhere. It is also possible that buggy code left some sstables compacting after it was done with them", metadata.cfName); } } + logger.debug("Compactions successfully cancelled"); + + // run our task + try + { + return callable.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } } finally {
