Merge branch 'cassandra-2.0' into cassandra-2.1
Conflicts:
CHANGES.txt
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/730cc606
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/730cc606
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/730cc606
Branch: refs/heads/trunk
Commit: 730cc60640a683ef455c1b8856e4ed8fc39a460f
Parents: 0ba5f27 2cf4ca3
Author: Marcus Eriksson <[email protected]>
Authored: Wed Apr 1 14:39:50 2015 +0200
Committer: Marcus Eriksson <[email protected]>
Committed: Wed Apr 1 14:39:50 2015 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
src/java/org/apache/cassandra/db/compaction/CompactionTask.java | 3 +++
2 files changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index ab5fb2d,b6b5caf..3b99c61
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,70 -1,8 +1,71 @@@
-2.0.14:
+2.1.4
+ * Buffer bloom filter serialization (CASSANDRA-9066)
+ * Fix anti-compaction target bloom filter size (CASSANDRA-9060)
+ * Make FROZEN and TUPLE unreserved keywords in CQL (CASSANDRA-9047)
+ * Prevent AssertionError from SizeEstimatesRecorder (CASSANDRA-9034)
+ * Avoid overwriting index summaries for sstables with an older format that
+ does not support downsampling; rebuild summaries on startup when this
+ is detected (CASSANDRA-8993)
+ * Fix potential data loss in CompressedSequentialWriter (CASSANDRA-8949)
+ * Make PasswordAuthenticator number of hashing rounds configurable
(CASSANDRA-8085)
+ * Fix AssertionError when binding nested collections in DELETE
(CASSANDRA-8900)
+ * Check for overlap with non-early sstables in LCS (CASSANDRA-8739)
+ * Only calculate max purgable timestamp if we have to (CASSANDRA-8914)
+ * (cqlsh) Greatly improve performance of COPY FROM (CASSANDRA-8225)
+ * IndexSummary effectiveIndexInterval is now a guideline, not a rule
(CASSANDRA-8993)
+ * Use correct bounds for page cache eviction of compressed files
(CASSANDRA-8746)
+ * SSTableScanner enforces its bounds (CASSANDRA-8946)
+ * Cleanup cell equality (CASSANDRA-8947)
+ * Introduce intra-cluster message coalescing (CASSANDRA-8692)
+ * DatabaseDescriptor throws NPE when rpc_interface is used (CASSANDRA-8839)
+ * Don't check if an sstable is live for offline compactions (CASSANDRA-8841)
+ * Don't set clientMode in SSTableLoader (CASSANDRA-8238)
+ * Fix SSTableRewriter with disabled early open (CASSANDRA-8535)
+ * Allow invalidating permissions and cache time (CASSANDRA-8722)
+ * Log warning when queries that will require ALLOW FILTERING in Cassandra 3.0
+ are executed (CASSANDRA-8418)
+ * Fix cassandra-stress so it respects the CL passed in user mode
(CASSANDRA-8948)
+ * Fix rare NPE in ColumnDefinition#hasIndexOption() (CASSANDRA-8786)
+ * cassandra-stress reports per-operation statistics, plus misc
(CASSANDRA-8769)
+ * Add SimpleDate (cql date) and Time (cql time) types (CASSANDRA-7523)
+ * Use long for key count in cfstats (CASSANDRA-8913)
+ * Make SSTableRewriter.abort() more robust to failure (CASSANDRA-8832)
+ * Remove cold_reads_to_omit from STCS (CASSANDRA-8860)
+ * Make EstimatedHistogram#percentile() use ceil instead of floor
(CASSANDRA-8883)
+ * Fix top partitions reporting wrong cardinality (CASSANDRA-8834)
+ * Fix rare NPE in KeyCacheSerializer (CASSANDRA-8067)
+ * Pick sstables for validation as late as possible inc repairs
(CASSANDRA-8366)
+ * Fix commitlog getPendingTasks to not increment (CASSANDRA-8856)
+ * Fix parallelism adjustment in range and secondary index queries
+ when the first fetch does not satisfy the limit (CASSANDRA-8856)
+ * Check if the filtered sstables is non-empty in STCS (CASSANDRA-8843)
+ * Upgrade java-driver used for cassandra-stress (CASSANDRA-8842)
+ * Fix CommitLog.forceRecycleAllSegments() memory access error
(CASSANDRA-8812)
+ * Improve assertions in Memory (CASSANDRA-8792)
+ * Fix SSTableRewriter cleanup (CASSANDRA-8802)
+ * Introduce SafeMemory for CompressionMetadata.Writer (CASSANDRA-8758)
+ * 'nodetool info' prints exception against older node (CASSANDRA-8796)
+ * Ensure SSTableReader.last corresponds exactly with the file end
(CASSANDRA-8750)
+ * Make SSTableWriter.openEarly more robust and obvious (CASSANDRA-8747)
+ * Enforce SSTableReader.first/last (CASSANDRA-8744)
+ * Cleanup SegmentedFile API (CASSANDRA-8749)
+ * Avoid overlap with early compaction replacement (CASSANDRA-8683)
+ * Safer Resource Management++ (CASSANDRA-8707)
+ * Write partition size estimates into a system table (CASSANDRA-7688)
+ * cqlsh: Fix keys() and full() collection indexes in DESCRIBE output
+ (CASSANDRA-8154)
+ * Show progress of streaming in nodetool netstats (CASSANDRA-8886)
+ * IndexSummaryBuilder utilises offheap memory, and shares data between
+ each IndexSummary opened from it (CASSANDRA-8757)
+ * markCompacting only succeeds if the exact SSTableReader instances being
+ marked are in the live set (CASSANDRA-8689)
+ * cassandra-stress support for varint (CASSANDRA-8882)
+ * Fix Adler32 digest for compressed sstables (CASSANDRA-8778)
+ * Add nodetool statushandoff/statusbackup (CASSANDRA-8912)
+ * Use stdout for progress and stats in sstableloader (CASSANDRA-8982)
+Merged from 2.0:
+ * Avoid race in cancelling compactions (CASSANDRA-9070)
* More aggressive check for expired sstables in DTCS (CASSANDRA-8359)
- * Don't set clientMode to true when bulk-loading sstables to avoid
- a NullPointerException (CASSANDRA-8238)
* Fix ignored index_interval change in ALTER TABLE statements
(CASSANDRA-7976)
* Do more aggressive compaction in old time windows in DTCS (CASSANDRA-8360)
* java.lang.AssertionError when reading saved cache (CASSANDRA-8740)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/730cc606/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4d9b463,9f7c8dd..392034c
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -137,148 -117,188 +137,151 @@@ public class CompactionTask extends Abs
// new sstables from flush can be added during a compaction, but only
the compaction can remove them,
// so in our single-threaded compaction world this is a valid way of
determining if we're compacting
// all the sstables (that existed when we started)
- logger.info("Compacting {}", toCompact);
+ logger.info("Compacting {}", sstables);
long start = System.nanoTime();
- long totalkeysWritten = 0;
-
- long estimatedTotalKeys = Math.max(cfs.metadata.getIndexInterval(),
SSTableReader.getApproximateKeyCount(actuallyCompact, cfs.metadata));
- long estimatedSSTables = Math.max(1,
cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) /
strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys /
estimatedSSTables);
- if (logger.isDebugEnabled())
- logger.debug("Expected bloom filter size : " + keysPerSSTable);
-
- AbstractCompactionIterable ci =
DatabaseDescriptor.isMultithreadedCompaction()
- ? new
ParallelCompactionIterable(compactionType,
strategy.getScanners(actuallyCompact), controller)
- : new
CompactionIterable(compactionType, strategy.getScanners(actuallyCompact),
controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- Map<DecoratedKey, RowIndexEntry> cachedKeys = new
HashMap<DecoratedKey, RowIndexEntry>();
-
- // we can't preheat until the tracker has been set. This doesn't
happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- Map<Descriptor, Map<DecoratedKey, RowIndexEntry>> cachedKeyMap = new
HashMap<Descriptor, Map<DecoratedKey, RowIndexEntry>>();
-
- Collection<SSTableReader> sstables = new ArrayList<SSTableReader>();
- Collection<SSTableWriter> writers = new ArrayList<SSTableWriter>();
-
- if (collector != null)
- collector.beginCompaction(ci);
- try
- {
- if (!controller.cfs.getCompactionStrategy().isActive)
- throw new
CompactionInterruptedException(ci.getCompactionInfo());
+
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there
_is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is
no period during which
- // a crash could cause data loss.
- cfs.markObsolete(toCompact, compactionType);
- return;
- }
+ long totalKeysWritten = 0;
- long writeSize = getExpectedWriteSize() / estimatedSSTables;
- Directories.DataDirectory dataDirectory =
getWriteDirectory(writeSize);
- SSTableWriter writer =
createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory),
keysPerSSTable);
- writers.add(writer);
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new
CompactionInterruptedException(ci.getCompactionInfo());
+ try (CompactionController controller =
getCompactionController(sstables);)
+ {
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables,
controller.getFullyExpiredSSTables());
- AbstractCompactedRow row = iter.next();
- RowIndexEntry indexEntry = writer.append(row);
- if (indexEntry == null)
- {
- controller.invalidateCachedRow(row.key);
- row.close();
- continue;
- }
+ long estimatedTotalKeys =
Math.max(cfs.metadata.getMinIndexInterval(),
SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1,
cfs.getExpectedCompactedFileSize(actuallyCompact, compactionType) /
strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double)
estimatedTotalKeys / estimatedSSTables);
+ long expectedSSTableSize = Math.min(getExpectedWriteSize(),
strategy.getMaxSSTableBytes());
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- totalkeysWritten++;
+ List<SSTableReader> newSStables;
+ AbstractCompactionIterable ci;
- if (DatabaseDescriptor.getPreheatKeyCache())
+ // SSTableScanners need to be closed before
markCompactedSSTablesReplaced call as scanners contain references
+ // to both ifile and dfile and SSTR will throw deletion errors on
Windows if it tries to delete before scanner is closed.
+ // See CASSANDRA-8019 and CASSANDRA-8399
+ try (AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(actuallyCompact))
+ {
+ ci = new CompactionIterable(compactionType,
scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+ // we can't preheat until the tracker has been set. This
doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here
until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually
retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ long lastCheckObsoletion = start;
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables,
maxAge, offline);
+ try
{
- for (SSTableReader sstable : actuallyCompact)
++ if (!controller.cfs.getCompactionStrategy().isActive)
++ throw new
CompactionInterruptedException(ci.getCompactionInfo());
+ if (!iter.hasNext())
+ {
+ // don't mark compacted in the finally block, since
if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so
there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
+ }
+
+
writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)),
keysPerSSTable, minRepairedAt));
+ while (iter.hasNext())
{
- if (sstable.getCachedPosition(row.key, false) != null)
+ if (ci.isStopRequested())
+ throw new
CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+
writer.switchWriter(createCompactionWriter(cfs.directories.getLocationForDisk(getWriteDirectory(expectedSSTableSize)),
keysPerSSTable, minRepairedAt));
+ }
+ }
+
+ if (System.nanoTime() - lastCheckObsoletion >
TimeUnit.MINUTES.toNanos(1L))
{
- cachedKeys.put(row.key, indexEntry);
- break;
+ controller.maybeRefreshOverlaps();
+ lastCheckObsoletion = System.nanoTime();
}
}
- }
- if (newSSTableSegmentThresholdReached(writer))
+ // don't replace old sstables yet, as we need to mark the
compaction finished in the system table
+ newSStables = writer.finish();
+ }
+ catch (Throwable t)
{
- // tmp = false because later we want to query it with
descriptor from SSTableReader
- cachedKeyMap.put(writer.descriptor.asTemporary(false),
cachedKeys);
- writeSize = getExpectedWriteSize() / estimatedSSTables;
- dataDirectory = getWriteDirectory(writeSize);
- writer =
createCompactionWriter(cfs.directories.getLocationForDisk(dataDirectory),
keysPerSSTable);
- writers.add(writer);
- cachedKeys = new HashMap<>();
+ try
+ {
+ writer.abort();
+ }
+ catch (Throwable t2)
+ {
+ t.addSuppressed(t2);
+ }
+ throw t;
}
- }
-
- if (writer.getFilePointer() > 0)
- {
- cachedKeyMap.put(writer.descriptor.asTemporary(false),
cachedKeys);
- }
- else
- {
- writer.abort();
- writers.remove(writer);
- }
+ finally
+ {
+ // point of no return -- the new sstables are live on
disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- long maxAge = getMaxDataAge(toCompact);
- for (SSTableWriter completedWriter : writers)
- sstables.add(completedWriter.closeAndOpenReader(maxAge));
- }
- catch (Throwable t)
- {
- for (SSTableWriter writer : writers)
- writer.abort();
- // also remove already completed SSTables
- for (SSTableReader sstable : sstables)
- {
- sstable.markObsolete();
- sstable.releaseReference();
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
}
- throw Throwables.propagate(t);
- }
- finally
- {
- controller.close();
-
- // point of no return -- the new sstables are live on disk; next
we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
-
- if (collector != null)
- collector.finishCompaction(ci);
- try
+ Collection<SSTableReader> oldSStables = this.sstables;
+ if (!offline)
+
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables,
compactionType);
+
+ // log a bunch of statistics about the result and save to system
table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) /
((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length *
10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
{
- // We don't expect this to throw, but just in case, we do it
after the cleanup above, to make sure
- // we don't end up with compaction information hanging around
indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
- }
- }
+ long count = counts[i];
+ if (count == 0)
+ continue;
- replaceCompactedSSTables(toCompact, sstables);
- // TODO: this doesn't belong here, it should be part of the reader to
load when the tracker is wired up
- for (SSTableReader sstable : sstables)
- {
- if (sstable.acquireReference())
- {
- try
- {
- sstable.preheat(cachedKeyMap.get(sstable.descriptor));
- }
- finally
- {
- sstable.releaseReference();
- }
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows, count));
+ mergedRows.put(rows, count);
}
- }
- // log a bunch of statistics about the result and save to system
table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTable.getTotalBytes(toCompact);
- long endsize = SSTable.getTotalBytes(sstables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder builder = new StringBuilder();
- for (SSTableReader reader : sstables)
- builder.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) /
((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<Integer, Long>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
+ SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(),
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
+ logger.info(String.format("Compacted %d sstables to [%s]. %,d
bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions
merged to %,d. Partition merge counts were {%s}",
+ oldSStables.size(),
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime,
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d",
CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}",
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten -
estimatedTotalKeys)/totalKeysWritten));
}
+ }
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(),
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s]. %,d bytes
to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to
%,d. Partition merge counts were {%s}",
- toCompact.size(), builder.toString(),
startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows,
totalkeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d",
CompactionTask.addToTotalBytesCompacted(endsize)));
+ private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
+ {
+ long minRepairedAt= Long.MAX_VALUE;
+ for (SSTableReader sstable : actuallyCompact)
+ minRepairedAt = Math.min(minRepairedAt,
sstable.getSSTableMetadata().repairedAt);
+ if (minRepairedAt == Long.MAX_VALUE)
+ return ActiveRepairService.UNREPAIRED_SSTABLE;
+ return minRepairedAt;
}
protected void checkAvailableDiskSpace(long estimatedSSTables)