Merge branch 'cassandra-2.0' into cassandra-2.1 Conflicts: CHANGES.txt src/java/org/apache/cassandra/db/Memtable.java src/java/org/apache/cassandra/db/compaction/CompactionTask.java src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d01c365 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d01c365 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d01c365 Branch: refs/heads/cassandra-2.1 Commit: 0d01c36599a7721a864780a3a10e134fdfa6797a Parents: f02d194 2ce1ad8 Author: Marcus Eriksson <marc...@apache.org> Authored: Mon Nov 24 10:27:50 2014 +0100 Committer: Marcus Eriksson <marc...@apache.org> Committed: Mon Nov 24 10:27:50 2014 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + src/java/org/apache/cassandra/db/Memtable.java | 6 +++-- .../cassandra/db/compaction/CompactionTask.java | 10 ++++----- .../cassandra/io/util/DiskAwareRunnable.java | 23 ++------------------ 4 files changed, 12 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/CHANGES.txt ---------------------------------------------------------------------- diff --cc CHANGES.txt index 96da1bd,6a5ac0d..313000a --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,18 -1,5 +1,19 @@@ -2.0.12: +2.1.3 + * Fix high size calculations for prepared statements (CASSANDRA-8231) + * Centralize shared executors (CASSANDRA-8055) + * Fix filtering for CONTAINS (KEY) relations on frozen collection + clustering columns when the query is restricted to a single + partition (CASSANDRA-8203) + * Do more aggressive entire-sstable TTL expiry checks (CASSANDRA-8243) + * Add more log info if readMeter is null (CASSANDRA-8238) + * add check of the system wall clock time at startup (CASSANDRA-8305) + * Support for frozen collections (CASSANDRA-7859) + * Fix overflow on histogram computation (CASSANDRA-8028) + * Have paxos reuse the timestamp generation of normal queries (CASSANDRA-7801) + * Fix incremental repair not remove parent session on remote (CASSANDRA-8291) + * Improve JBOD disk utilization (CASSANDRA-7386) +Merged from 2.0: + * Make LCS split compaction results over all data directories (CASSANDRA-8329) * Fix some failing queries that use multi-column relations on COMPACT STORAGE tables (CASSANDRA-8264) * Fix InvalidRequestException with ORDER BY (CASSANDRA-8286) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/Memtable.java index ba3864f,425b352..3ae5da4 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@@ -306,12 -336,23 +306,14 @@@ public class Memtabl return estimatedSize; } - protected void runWith(File sstableDirectory) throws Exception + protected void runMayThrow() throws Exception { + long writeSize = getExpectedWriteSize(); + Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); + File sstableDirectory = cfs.directories.getLocationForDisk(dataDirectory); assert sstableDirectory != null : "Flush task is not bound to any disk"; - - try - { - SSTableReader sstable = writeSortedContents(context, sstableDirectory); - cfs.replaceFlushed(Memtable.this, sstable); - latch.countDown(); - } - finally - { - if (dataDirectory != null) - returnWriteDirectory(dataDirectory, writeSize); - } + SSTableReader sstable = writeSortedContents(context, sstableDirectory); + cfs.replaceFlushed(Memtable.this, sstable); } protected Directories getDirectories() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/db/compaction/CompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java index b442482,08fe81a..0e8900d --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@@ -106,11 -91,8 +106,11 @@@ public class CompactionTask extends Abs { // The collection of sstables passed may be empty (but not null); even if // it is not empty, it may compact down to nothing if all rows are deleted. - assert sstables != null && sstableDirectory != null; + assert sstables != null; + if (sstables.size() == 0) + return; + // Note that the current compaction strategy, is not necessarily the one this task was created under. // This should be harmless; see comments to CFS.maybeReloadCompactionStrategy. AbstractCompactionStrategy strategy = cfs.getCompactionStrategy(); @@@ -133,147 -112,206 +133,147 @@@ // new sstables from flush can be added during a compaction, but only the compaction can remove them, // so in our single-threaded compaction world this is a valid way of determining if we're compacting // all the sstables (that existed when we started) - logger.info("Compacting {}", toCompact); + logger.info("Compacting {}", sstables); long start = System.nanoTime(); - long totalkeysWritten = 0; - - long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata)); - long estimatedSSTables = Math.max(1, SSTable.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); - long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); - if (logger.isDebugEnabled()) - logger.debug("Expected bloom filter size : " + keysPerSSTable); - - AbstractCompactionIterable ci = DatabaseDescriptor.isMultithreadedCompaction() - ? new ParallelCompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller) - : new CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), controller); - CloseableIterator<AbstractCompactedRow> iter = ci.iterator(); - Map<DecoratedKey, RowIndexEntry> cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); - - // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to - // replace the old entries. Track entries to preheat here until then. - Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>(); - - Collection<SSTableReader> sstables = new ArrayList<SSTableReader>(); - Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>(); - - if (collector != null) - collector.beginCompaction(ci); - try + long totalKeysWritten = 0; + + try (CompactionController controller = getCompactionController(sstables);) { - if (!iter.hasNext()) - { - // don't mark compacted in the finally block, since if there _is_ nondeleted data, - // we need to sync it (via closeAndOpen) first, so there is no period during which - // a crash could cause data loss. - cfs.markObsolete(toCompact, compactionType); - return; - } - long writeSize = getExpectedWriteSize() / estimatedSSTables; - Directories.DataDirectory dataDirectory = getWriteDirectory(writeSize); - SSTableWriter writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - try + Set<SSTableReader> actuallyCompact = Sets.difference(sstables, controller.getFullyExpiredSSTables()); + + long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(actuallyCompact)); + long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes()); + long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); + logger.debug("Expected bloom filter size : {}", keysPerSSTable); + + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - while (iter.hasNext()) + AbstractCompactionIterable ci = new CompactionIterable(compactionType, scanners.scanners, controller); + Iterator<AbstractCompactedRow> iter = ci.iterator(); + List<SSTableReader> newSStables; + // we can't preheat until the tracker has been set. This doesn't happen until we tell the cfs to + // replace the old entries. Track entries to preheat here until then. + long minRepairedAt = getMinRepairedAt(actuallyCompact); + // we only need the age of the data that we're actually retaining + long maxAge = getMaxDataAge(actuallyCompact); + if (collector != null) + collector.beginCompaction(ci); + long lastCheckObsoletion = start; + SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, offline); + try { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); - - AbstractCompactedRow row = iter.next(); - RowIndexEntry indexEntry = writer.append(row); - if (indexEntry == null) + if (!iter.hasNext()) { - controller.invalidateCachedRow(row.key); - row.close(); - continue; + // don't mark compacted in the finally block, since if there _is_ nondeleted data, + // we need to sync it (via closeAndOpen) first, so there is no period during which + // a crash could cause data loss. + cfs.markObsolete(sstables, compactionType); + return; } - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); - totalkeysWritten++; - - if (DatabaseDescriptor.getPreheatKeyCache()) ++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt)); + while (iter.hasNext()) { - for (SSTableReader sstable : actuallyCompact) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + AbstractCompactedRow row = iter.next(); + if (writer.append(row) != null) { - if (sstable.getCachedPosition(row.key, false) != null) + totalKeysWritten++; + if (newSSTableSegmentThresholdReached(writer.currentWriter())) { - writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, minRepairedAt)); - cachedKeys.put(row.key, indexEntry); - break; ++ writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(estimatedTotalKeys/estimatedSSTables)), keysPerSSTable, minRepairedAt)); } } - } - if (newSSTableSegmentThresholdReached(writer)) - { - // tmp = false because later we want to query it with descriptor from SSTableReader - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - returnWriteDirectory(dataDirectory, writeSize); - // make sure we don't try to call returnWriteDirectory in finally {..} if we throw exception in getWriteDirectory() below: - dataDirectory = null; - writeSize = getExpectedWriteSize() / estimatedSSTables; - dataDirectory = getWriteDirectory(writeSize); - writer = createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory), keysPerSSTable); - writers.add(writer); - cachedKeys = new HashMap<DecoratedKey, RowIndexEntry>(); + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) + { + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); + } } - } - } - finally - { - if (dataDirectory != null) - returnWriteDirectory(dataDirectory, writeSize); - } - - if (writer.getFilePointer() > 0) - { - cachedKeyMap.put(writer.descriptor.asTemporary(false), cachedKeys); - } - else - { - writer.abort(); - writers.remove(writer); - } - long maxAge = getMaxDataAge(toCompact); - for (SSTableWriter completedWriter : writers) - sstables.add(completedWriter.closeAndOpenReader(maxAge)); - } - catch (Throwable t) - { - for (SSTableWriter writer : writers) - writer.abort(); - // also remove already completed SSTables - for (SSTableReader sstable : sstables) - { - sstable.markObsolete(); - sstable.releaseReference(); - } - throw Throwables.propagate(t); - } - finally - { - controller.close(); - - // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones - // (in replaceCompactedSSTables) - if (taskId != null) - SystemKeyspace.finishCompaction(taskId); - - if (collector != null) - collector.finishCompaction(ci); - - try - { - // We don't expect this to throw, but just in case, we do it after the cleanup above, to make sure - // we don't end up with compaction information hanging around indefinitely in limbo. - iter.close(); - } - catch (IOException e) - { - throw new RuntimeException(e); - } - } - - replaceCompactedSSTables(toCompact, sstables); - // TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up - for (SSTableReader sstable : sstables) - { - if (sstable.acquireReference()) - { - try + // don't replace old sstables yet, as we need to mark the compaction finished in the system table + newSStables = writer.finish(); + } + catch (Throwable t) { - sstable.preheat(cachedKeyMap.get(sstable.descriptor)); + writer.abort(); + throw t; } finally { - - sstable.releaseReference(); + // point of no return -- the new sstables are live on disk; next we'll start deleting the old ones + // (in replaceCompactedSSTables) + if (taskId != null) + SystemKeyspace.finishCompaction(taskId); + + if (collector != null) + collector.finishCompaction(ci); } - } - } - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTable.getTotalBytes(toCompact); - long endsize = SSTable.getTotalBytes(sstables); - double ratio = (double) endsize / (double) startsize; - - StringBuilder builder = new StringBuilder(); - for (SSTableReader reader : sstables) - builder.append(reader.descriptor.baseFilename()).append(","); - - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - long[] counts = ci.getMergedRowCounts(); - StringBuilder mergeSummary = new StringBuilder(counts.length * 10); - Map<Integer, Long> mergedRows = new HashMap<Integer, Long>(); - for (int i = 0; i < counts.length; i++) - { - long count = counts[i]; - if (count == 0) - continue; - - int rows = i + 1; - totalSourceRows += rows * count; - mergeSummary.append(String.format("%d:%d, ", rows, count)); - mergedRows.put(rows, count); + Collection<SSTableReader> oldSStables = this.sstables; + if (!offline) + cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, compactionType); + + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(oldSStables); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; + + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + long[] counts = ci.getMergedRowCounts(); + StringBuilder mergeSummary = new StringBuilder(counts.length * 10); + Map<Integer, Long> mergedRows = new HashMap<>(); + for (int i = 0; i < counts.length; i++) + { + long count = counts[i]; + if (count == 0) + continue; + + int rows = i + 1; + totalSourceRows += rows * count; + mergeSummary.append(String.format("%d:%d, ", rows, count)); + mergedRows.put(rows, count); + } + + SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); + logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + oldSStables.size(), newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString())); + logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - estimatedTotalKeys)/totalKeysWritten)); + } } + } - SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows); - logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - toCompact.size(), builder.toString(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalkeysWritten, mergeSummary.toString())); - logger.debug(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + private long getMinRepairedAt(Set<SSTableReader> actuallyCompact) + { + long minRepairedAt= Long.MAX_VALUE; + for (SSTableReader sstable : actuallyCompact) + minRepairedAt = Math.min(minRepairedAt, sstable.getSSTableMetadata().repairedAt); + if (minRepairedAt == Long.MAX_VALUE) + return ActiveRepairService.UNREPAIRED_SSTABLE; + return minRepairedAt; } - private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable) + private SSTableWriter createCompactionWriter(File sstableDirectory, long keysPerSSTable, long repairedAt) { + assert sstableDirectory != null; return new SSTableWriter(cfs.getTempSSTablePath(sstableDirectory), keysPerSSTable, + repairedAt, cfs.metadata, cfs.partitioner, - SSTableMetadata.createCollector(toCompact, cfs.metadata.comparator, getLevel())); + new MetadataCollector(sstables, cfs.metadata.comparator, getLevel())); } protected int getLevel() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d01c365/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java index 6d453e5,93b06ab..4188f6e --- a/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java +++ b/src/java/org/apache/cassandra/io/util/DiskAwareRunnable.java @@@ -32,18 -27,24 +27,16 @@@ public abstract class DiskAwareRunnabl Directories.DataDirectory directory; while (true) { - writeSize = getExpectedWriteSize(); - directory = getDirectories().getWriteableLocation(); + directory = getDirectories().getWriteableLocation(writeSize); if (directory != null || !reduceScopeForLimitedSpace()) break; } if (directory == null) throw new RuntimeException("Insufficient disk space to write " + writeSize + " bytes"); - runWith(getDirectories().getLocationForDisk(directory)); - directory.currentTasks.incrementAndGet(); - directory.estimatedWorkingSize.addAndGet(writeSize); + return directory; } - protected void returnWriteDirectory(Directories.DataDirectory directory, long writeSize) - { - directory.estimatedWorkingSize.addAndGet(-1 * writeSize); - directory.currentTasks.decrementAndGet(); - } - /** * Get sstable directories for the CF. * @return Directories instance for the CF.