Merge branch 'cassandra-2.1' into trunk
Conflicts:
src/java/org/apache/cassandra/db/compaction/CompactionManager.java
src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0956a8a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0956a8a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0956a8a7
Branch: refs/heads/trunk
Commit: 0956a8a717781c8a748931f04a18a215d7d53869
Parents: 3e305f8 0e83100
Author: Benedict Elliott Smith <[email protected]>
Authored: Fri Sep 19 18:26:23 2014 +0100
Committer: Benedict Elliott Smith <[email protected]>
Committed: Fri Sep 19 18:26:23 2014 +0100
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/db/DataTracker.java | 5 +-
.../compaction/AbstractCompactionStrategy.java | 56 ++++-
.../db/compaction/CompactionManager.java | 199 +++++++++---------
.../cassandra/db/compaction/CompactionTask.java | 204 +++++++++----------
.../compaction/LeveledCompactionStrategy.java | 43 ++--
.../cassandra/db/compaction/Upgrader.java | 3 +-
.../cassandra/utils/CloseableIterator.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
9 files changed, 289 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e00d990,f55e5d2..0af1681
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,5 +1,30 @@@
+3.0
+ * Make assassinate a first class command (CASSANDRA-7935)
+ * Support IN clause on any clustering column (CASSANDRA-4762)
+ * Improve compaction logging (CASSANDRA-7818)
+ * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
+ * Support Java source code for user-defined functions (CASSANDRA-7562)
+ * Require arg types to disambiguate UDF drops (CASSANDRA-7812)
+ * Do anticompaction in groups (CASSANDRA-6851)
+ * Verify that UDF class methods are static (CASSANDRA-7781)
+ * Support pure user-defined functions (CASSANDRA-7395, 7740)
+ * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
+ * Move sstable RandomAccessReader to nio2, which allows using the
+ FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
+ * Remove CQL2 (CASSANDRA-5918)
+ * Add Thrift get_multi_slice call (CASSANDRA-6757)
+ * Optimize fetching multiple cells by name (CASSANDRA-6933)
+ * Allow compilation in java 8 (CASSANDRA-7028)
+ * Make incremental repair default (CASSANDRA-7250)
+ * Enable code coverage thru JaCoCo (CASSANDRA-7226)
+ * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369)
+ * Shorten SSTable path (CASSANDRA-6962)
+ * Use unsafe mutations for most unit tests (CASSANDRA-6969)
+ * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
+
+
2.1.1
+ * Fix resource leak in event of corrupt sstable
* (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
* Provide visibility into prepared statements churn (CASSANDRA-7921,
CASSANDRA-7930)
* Invalidate prepared statements when their keyspace or table is
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28ab84e,97696a8..6a0e0df
---
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -19,8 -19,10 +19,9 @@@ package org.apache.cassandra.db.compact
import java.util.*;
+ import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.base.Predicate;
-import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 85b7e38,e309cfb..0f8acba
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1031,78 -990,63 +1040,74 @@@ public class CompactionManager implemen
if (!new File(sstable.getFilename()).exists())
{
logger.info("Skipping anticompaction for {}, required sstable
was compacted and is no longer available.", sstable);
+ i.remove();
continue;
}
+ if (groupMaxDataAge < sstable.maxDataAge)
+ groupMaxDataAge = sstable.maxDataAge;
+ }
- logger.info("Anticompacting {}", sstable);
- Set<SSTableReader> sstableAsSet = new HashSet<>();
- sstableAsSet.add(sstable);
+ if (anticompactionGroup.size() == 0)
+ {
+ logger.info("No valid anticompactions for this group, All
sstables were compacted and are no longer available");
+ return 0;
+ }
- File destination = cfs.directories.getDirectoryForNewSSTables();
- SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
- SSTableRewriter unRepairedSSTableWriter = new
SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge,
OperationType.ANTICOMPACTION, false);
+ logger.info("Anticompacting {}", anticompactionGroup);
+ Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- try (AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
- CompactionController controller = new
CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
- {
-
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, repairedAt, sstable));
-
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE,
sstable));
+ File destination = cfs.directories.getDirectoryForNewSSTables();
+ SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
+ SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs,
sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
- AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
- List<ICompactionScanner> scanners =
strategy.getScanners(anticompactionGroup);
-
- int expectedBloomFilterSize =
Math.max(cfs.metadata.getMinIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
-
- CompactionIterable ci = new
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
- Iterator<AbstractCompactedRow> iter = ci.iterator();
- while(iter.hasNext())
+ long repairedKeyCount = 0;
+ long unrepairedKeyCount = 0;
- try (CompactionController controller = new CompactionController(cfs,
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
++ AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
++ try (AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(anticompactionGroup);
++ CompactionController controller = new CompactionController(cfs,
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
+ {
++ int expectedBloomFilterSize =
Math.max(cfs.metadata.getMinIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
++
+
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
+
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE,
sstableAsSet));
+
- CompactionIterable ci = new
CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
-
- try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
++ CompactionIterable ci = new
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
++ Iterator<AbstractCompactedRow> iter = ci.iterator();
++ while(iter.hasNext())
+ {
- while(iter.hasNext())
++ AbstractCompactedRow row = iter.next();
++ // if current range from sstable is repaired, save it into
the new repaired sstable
++ if (Range.isInRanges(row.key.getToken(), ranges))
{
-- AbstractCompactedRow row = iter.next();
-- // if current range from sstable is repaired, save it
into the new repaired sstable
-- if (Range.isInRanges(row.key.getToken(), ranges))
-- {
-- repairedSSTableWriter.append(row);
-- repairedKeyCount++;
-- }
-- // otherwise save into the new 'non-repaired' table
-- else
-- {
-- unRepairedSSTableWriter.append(row);
-- unrepairedKeyCount++;
-- }
++ repairedSSTableWriter.append(row);
++ repairedKeyCount++;
++ }
++ // otherwise save into the new 'non-repaired' table
++ else
++ {
++ unRepairedSSTableWriter.append(row);
++ unrepairedKeyCount++;
}
- // we have the same readers being rewritten by both writers,
so we ask the first one NOT to close them
- // so that the second one can do so safely, without leaving
us with references < 0 or any other ugliness
- repairedSSTableWriter.finish(false, repairedAt);
-
unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
- // add repaired table with a non-null timestamp field to be
saved in SSTableMetadata#repairedAt
-
anticompactedSSTables.addAll(repairedSSTableWriter.finished());
-
anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
- }
- catch (Throwable e)
- {
- logger.error("Error anticompacting " + sstable, e);
- repairedSSTableWriter.abort();
- unRepairedSSTableWriter.abort();
}
+ // we have the same readers being rewritten by both writers, so
we ask the first one NOT to close them
+ // so that the second one can do so safely, without leaving us
with references < 0 or any other ugliness
+ repairedSSTableWriter.finish(false, repairedAt);
+
unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
+ // add repaired table with a non-null timestamp field to be saved
in SSTableMetadata#repairedAt
+ logger.debug("Repaired {} keys out of {} for {}/{} in {}",
repairedKeyCount,
+
repairedKeyCount + unrepairedKeyCount,
+
cfs.keyspace.getName(),
+
cfs.getColumnFamilyName(),
+
anticompactionGroup);
+ return repairedSSTableWriter.finished().size() +
unRepairedSSTableWriter.finished().size();
}
- String format = "Repaired {} keys of {} for {}/{}";
- logger.debug(format, repairedKeyCount, (repairedKeyCount +
unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
- String format2 = "Anticompaction completed successfully,
anticompacted from {} to {} sstable(s).";
- logger.info(format2, repairedSSTables.size(),
anticompactedSSTables.size());
-
- return anticompactedSSTables;
+ catch (Throwable e)
+ {
+ logger.error("Error anticompacting " + anticompactionGroup, e);
+ repairedSSTableWriter.abort();
+ unRepairedSSTableWriter.abort();
+ }
+ return 0;
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4a22d0c,6217348..527f483
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -20,7 -20,9 +20,8 @@@ package org.apache.cassandra.db.compact
import java.io.File;
import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
+ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@@ -43,8 -47,7 +44,7 @@@ import org.apache.cassandra.io.sstable.
import org.apache.cassandra.io.sstable.SSTableWriter;
import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
import org.apache.cassandra.service.ActiveRepairService;
--import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.UUIDGen;
public class CompactionTask extends AbstractCompactionTask
{
@@@ -144,120 -137,117 +141,117 @@@
long start = System.nanoTime();
long totalKeysWritten = 0;
- long estimatedTotalKeys =
Math.max(cfs.metadata.getMinIndexInterval(),
SSTableReader.getApproximateKeyCount(actuallyCompact));
- long estimatedSSTables = Math.max(1,
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
- long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys /
estimatedSSTables);
- logger.debug("Expected bloom filter size : {}", keysPerSSTable);
-
- // TODO: errors when creating the scanners can result in untidied
resources
- AbstractCompactionIterable ci = new
CompactionIterable(compactionType, strategy.getScanners(actuallyCompact),
controller);
- CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
-
- // we can't preheat until the tracker has been set. This doesn't
happen until we tell the cfs to
- // replace the old entries. Track entries to preheat here until then.
- long minRepairedAt = getMinRepairedAt(actuallyCompact);
- // we only need the age of the data that we're actually retaining
- long maxAge = getMaxDataAge(actuallyCompact);
- if (collector != null)
- collector.beginCompaction(ci);
- SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge,
compactionType, offline);
- try
+
+ try (CompactionController controller =
getCompactionController(sstables);)
{
- if (!iter.hasNext())
- {
- // don't mark compacted in the finally block, since if there
_is_ nondeleted data,
- // we need to sync it (via closeAndOpen) first, so there is
no period during which
- // a crash could cause data loss.
- cfs.markObsolete(sstables, compactionType);
- return;
- }
- writer.switchWriter(createCompactionWriter(sstableDirectory,
keysPerSSTable, minRepairedAt));
- while (iter.hasNext())
- {
- if (ci.isStopRequested())
- throw new
CompactionInterruptedException(ci.getCompactionInfo());
+ Set<SSTableReader> actuallyCompact = Sets.difference(sstables,
controller.getFullyExpiredSSTables());
+
+ long estimatedTotalKeys =
Math.max(cfs.metadata.getMinIndexInterval(),
SSTableReader.getApproximateKeyCount(actuallyCompact));
+ long estimatedSSTables = Math.max(1,
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+ long keysPerSSTable = (long) Math.ceil((double)
estimatedTotalKeys / estimatedSSTables);
+ logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- AbstractCompactedRow row = iter.next();
- if (writer.append(row) != null)
+ try (AbstractCompactionStrategy.ScannerList scanners =
strategy.getScanners(actuallyCompact))
+ {
+ AbstractCompactionIterable ci = new
CompactionIterable(compactionType, scanners.scanners, controller);
+ Iterator<AbstractCompactedRow> iter = ci.iterator();
+
+ // we can't preheat until the tracker has been set. This
doesn't happen until we tell the cfs to
+ // replace the old entries. Track entries to preheat here
until then.
+ long minRepairedAt = getMinRepairedAt(actuallyCompact);
+ // we only need the age of the data that we're actually
retaining
+ long maxAge = getMaxDataAge(actuallyCompact);
+ if (collector != null)
+ collector.beginCompaction(ci);
+ SSTableRewriter writer = new SSTableRewriter(cfs, sstables,
maxAge, compactionType, offline);
+ try
{
- totalKeysWritten++;
- if
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+ if (!iter.hasNext())
{
-
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable,
minRepairedAt));
+ // don't mark compacted in the finally block, since
if there _is_ nondeleted data,
+ // we need to sync it (via closeAndOpen) first, so
there is no period during which
+ // a crash could cause data loss.
+ cfs.markObsolete(sstables, compactionType);
+ return;
}
+
+
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable,
minRepairedAt));
+ while (iter.hasNext())
+ {
+ if (ci.isStopRequested())
+ throw new
CompactionInterruptedException(ci.getCompactionInfo());
+
+ AbstractCompactedRow row = iter.next();
+ if (writer.append(row) != null)
+ {
+ totalKeysWritten++;
+ if
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+ {
+
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable,
minRepairedAt));
+ }
+ }
+ }
+
+ // don't replace old sstables yet, as we need to mark the
compaction finished in the system table
+ writer.finish(false);
}
- }
+ catch (Throwable t)
+ {
+ writer.abort();
+ throw t;
+ }
+ finally
+ {
- // don't replace old sstables yet, as we need to mark the
compaction finished in the system table
- writer.finish(false);
- }
- catch (Throwable t)
- {
- writer.abort();
- throw t;
- }
- finally
- {
- controller.close();
+ // point of no return -- the new sstables are live on
disk; next we'll start deleting the old ones
+ // (in replaceCompactedSSTables)
+ if (taskId != null)
+ SystemKeyspace.finishCompaction(taskId);
- // point of no return -- the new sstables are live on disk; next
we'll start deleting the old ones
- // (in replaceCompactedSSTables)
- if (taskId != null)
- SystemKeyspace.finishCompaction(taskId);
+ if (collector != null)
+ collector.finishCompaction(ci);
+ }
- if (collector != null)
- collector.finishCompaction(ci);
+ Collection<SSTableReader> oldSStables = this.sstables;
+ List<SSTableReader> newSStables = writer.finished();
+ if (!offline)
+
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables,
compactionType);
+
+ // log a bunch of statistics about the result and save to
system table compaction_history
+ long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()
- start);
+ long startsize = SSTableReader.getTotalBytes(oldSStables);
+ long endsize = SSTableReader.getTotalBytes(newSStables);
+ double ratio = (double) endsize / (double) startsize;
+
+ StringBuilder newSSTableNames = new StringBuilder();
+ for (SSTableReader reader : newSStables)
+
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+
+ double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) /
((double) dTime / 1000) : 0;
+ long totalSourceRows = 0;
+ long[] counts = ci.getMergedRowCounts();
+ StringBuilder mergeSummary = new StringBuilder(counts.length
* 10);
+ Map<Integer, Long> mergedRows = new HashMap<>();
+ for (int i = 0; i < counts.length; i++)
+ {
+ long count = counts[i];
+ if (count == 0)
+ continue;
+
+ int rows = i + 1;
+ totalSourceRows += rows * count;
+ mergeSummary.append(String.format("%d:%d, ", rows,
count));
+ mergedRows.put(rows, count);
+ }
- try
- {
- // We don't expect this to throw, but just in case, we do it
after the cleanup above, to make sure
- // we don't end up with compaction information hanging around
indefinitely in limbo.
- iter.close();
- }
- catch (IOException e)
- {
- throw new RuntimeException(e);
+
SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name,
System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted %d sstables to [%s].
%,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions
merged to %,d. Partition merge counts were {%s}",
- oldSStables.size(),
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime,
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++ logger.info(String.format("Compacted (%s) %d sstables to [%s]
to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d
total partitions merged to %,d. Partition merge counts were {%s}",
++ taskIdLoggerMsg,
oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize,
(int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten,
mergeSummary.toString()));
+ logger.debug(String.format("CF Total Bytes Compacted: %,d",
CompactionTask.addToTotalBytesCompacted(endsize)));
+ logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%:
{}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten -
estimatedTotalKeys)/totalKeysWritten));
}
}
-
- Collection<SSTableReader> oldSStables = this.sstables;
- List<SSTableReader> newSStables = writer.finished();
- if (!offline)
- cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables,
newSStables, compactionType);
-
- // log a bunch of statistics about the result and save to system
table compaction_history
- long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
- long startsize = SSTableReader.getTotalBytes(oldSStables);
- long endsize = SSTableReader.getTotalBytes(newSStables);
- double ratio = (double) endsize / (double) startsize;
-
- StringBuilder newSSTableNames = new StringBuilder();
- for (SSTableReader reader : newSStables)
-
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
-
- double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) /
((double) dTime / 1000) : 0;
- long totalSourceRows = 0;
- long[] counts = ci.getMergedRowCounts();
- StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
- Map<Integer, Long> mergedRows = new HashMap<>();
- for (int i = 0; i < counts.length; i++)
- {
- long count = counts[i];
- if (count == 0)
- continue;
-
- int rows = i + 1;
- totalSourceRows += rows * count;
- mergeSummary.append(String.format("%d:%d, ", rows, count));
- mergedRows.put(rows, count);
- }
-
- SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(),
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
- logger.info(String.format("Compacted (%s) %d sstables to [%s] to
level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total
partitions merged to %,d. Partition merge counts were {%s}",
- taskIdLoggerMsg, oldSStables.size(),
newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio *
100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
- logger.debug(String.format("CF Total Bytes Compacted: %,d",
CompactionTask.addToTotalBytesCompacted(endsize)));
- logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}",
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten -
estimatedTotalKeys)/totalKeysWritten));
}
private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------