Merge branch 'cassandra-2.1' into trunk

Conflicts:
        src/java/org/apache/cassandra/db/compaction/CompactionManager.java
        src/java/org/apache/cassandra/db/compaction/CompactionTask.java


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0956a8a7
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0956a8a7
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0956a8a7

Branch: refs/heads/trunk
Commit: 0956a8a717781c8a748931f04a18a215d7d53869
Parents: 3e305f8 0e83100
Author: Benedict Elliott Smith <[email protected]>
Authored: Fri Sep 19 18:26:23 2014 +0100
Committer: Benedict Elliott Smith <[email protected]>
Committed: Fri Sep 19 18:26:23 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../org/apache/cassandra/db/DataTracker.java    |   5 +-
 .../compaction/AbstractCompactionStrategy.java  |  56 ++++-
 .../db/compaction/CompactionManager.java        | 199 +++++++++---------
 .../cassandra/db/compaction/CompactionTask.java | 204 +++++++++----------
 .../compaction/LeveledCompactionStrategy.java   |  43 ++--
 .../cassandra/db/compaction/Upgrader.java       |   3 +-
 .../cassandra/utils/CloseableIterator.java      |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 9 files changed, 289 insertions(+), 226 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index e00d990,f55e5d2..0af1681
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,29 -1,5 +1,30 @@@
 +3.0
 + * Make assassinate a first class command (CASSANDRA-7935)
 + * Support IN clause on any clustering column (CASSANDRA-4762)
 + * Improve compaction logging (CASSANDRA-7818)
 + * Remove YamlFileNetworkTopologySnitch (CASSANDRA-7917)
 + * Support Java source code for user-defined functions (CASSANDRA-7562)
 + * Require arg types to disambiguate UDF drops (CASSANDRA-7812)
 + * Do anticompaction in groups (CASSANDRA-6851)
 + * Verify that UDF class methods are static (CASSANDRA-7781)
 + * Support pure user-defined functions (CASSANDRA-7395, 7740)
 + * Permit configurable timestamps with cassandra-stress (CASSANDRA-7416)
 + * Move sstable RandomAccessReader to nio2, which allows using the
 +   FILE_SHARE_DELETE flag on Windows (CASSANDRA-4050)
 + * Remove CQL2 (CASSANDRA-5918)
 + * Add Thrift get_multi_slice call (CASSANDRA-6757)
 + * Optimize fetching multiple cells by name (CASSANDRA-6933)
 + * Allow compilation in java 8 (CASSANDRA-7028)
 + * Make incremental repair default (CASSANDRA-7250)
 + * Enable code coverage thru JaCoCo (CASSANDRA-7226)
 + * Switch external naming of 'column families' to 'tables' (CASSANDRA-4369) 
 + * Shorten SSTable path (CASSANDRA-6962)
 + * Use unsafe mutations for most unit tests (CASSANDRA-6969)
 + * Fix race condition during calculation of pending ranges (CASSANDRA-7390)
 +
 +
  2.1.1
+  * Fix resource leak in event of corrupt sstable
   * (cqlsh) Add command line option for cqlshrc file path (CASSANDRA-7131)
   * Provide visibility into prepared statements churn (CASSANDRA-7921, 
CASSANDRA-7930)
   * Invalidate prepared statements when their keyspace or table is

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
----------------------------------------------------------------------
diff --cc 
src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
index 28ab84e,97696a8..6a0e0df
--- 
a/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
+++ 
b/src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
@@@ -19,8 -19,10 +19,9 @@@ package org.apache.cassandra.db.compact
  
  import java.util.*;
  
+ import com.google.common.base.Throwables;
  import com.google.common.collect.ImmutableMap;
  import com.google.common.base.Predicate;
 -import com.google.common.collect.ImmutableMap;
  import com.google.common.collect.Iterables;
  import com.google.common.util.concurrent.RateLimiter;
  import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 85b7e38,e309cfb..0f8acba
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@@ -1031,78 -990,63 +1040,74 @@@ public class CompactionManager implemen
              if (!new File(sstable.getFilename()).exists())
              {
                  logger.info("Skipping anticompaction for {}, required sstable 
was compacted and is no longer available.", sstable);
 +                i.remove();
                  continue;
              }
 +            if (groupMaxDataAge < sstable.maxDataAge)
 +                groupMaxDataAge = sstable.maxDataAge;
 +        }
  
 -            logger.info("Anticompacting {}", sstable);
 -            Set<SSTableReader> sstableAsSet = new HashSet<>();
 -            sstableAsSet.add(sstable);
 +        if (anticompactionGroup.size() == 0)
 +        {
 +            logger.info("No valid anticompactions for this group, All 
sstables were compacted and are no longer available");
 +            return 0;
 +        }
  
 -            File destination = cfs.directories.getDirectoryForNewSSTables();
 -            SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, sstable.maxDataAge, OperationType.ANTICOMPACTION, false);
 -            SSTableRewriter unRepairedSSTableWriter = new 
SSTableRewriter(cfs, sstableAsSet, sstable.maxDataAge, 
OperationType.ANTICOMPACTION, false);
 +        logger.info("Anticompacting {}", anticompactionGroup);
 +        Set<SSTableReader> sstableAsSet = new HashSet<>(anticompactionGroup);
  
 -            AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
 -            try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(new HashSet<>(Collections.singleton(sstable)));
 -                 CompactionController controller = new 
CompactionController(cfs, sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 -            {
 -                
repairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, repairedAt, sstable));
 -                
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriter(cfs, 
destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstable));
 +        File destination = cfs.directories.getDirectoryForNewSSTables();
 +        SSTableRewriter repairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
 +        SSTableRewriter unRepairedSSTableWriter = new SSTableRewriter(cfs, 
sstableAsSet, groupMaxDataAge, OperationType.ANTICOMPACTION, false);
  
-         AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
-         List<ICompactionScanner> scanners = 
strategy.getScanners(anticompactionGroup);
- 
-         int expectedBloomFilterSize = 
Math.max(cfs.metadata.getMinIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
- 
 -                CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
 -                Iterator<AbstractCompactedRow> iter = ci.iterator();
 -                while(iter.hasNext())
 +        long repairedKeyCount = 0;
 +        long unrepairedKeyCount = 0;
-         try (CompactionController controller = new CompactionController(cfs, 
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
++        AbstractCompactionStrategy strategy = cfs.getCompactionStrategy();
++        try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(anticompactionGroup);
++             CompactionController controller = new CompactionController(cfs, 
sstableAsSet, CFMetaData.DEFAULT_GC_GRACE_SECONDS))
 +        {
++            int expectedBloomFilterSize = 
Math.max(cfs.metadata.getMinIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(anticompactionGroup)));
++
 +            
repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, repairedAt, sstableAsSet));
 +            
unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs,
 destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, 
sstableAsSet));
 +
-             CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners, controller);
- 
-             try (CloseableIterator<AbstractCompactedRow> iter = ci.iterator())
++            CompactionIterable ci = new 
CompactionIterable(OperationType.ANTICOMPACTION, scanners.scanners, controller);
++            Iterator<AbstractCompactedRow> iter = ci.iterator();
++            while(iter.hasNext())
 +            {
-                 while(iter.hasNext())
++                AbstractCompactedRow row = iter.next();
++                // if current range from sstable is repaired, save it into 
the new repaired sstable
++                if (Range.isInRanges(row.key.getToken(), ranges))
                  {
--                    AbstractCompactedRow row = iter.next();
--                    // if current range from sstable is repaired, save it 
into the new repaired sstable
--                    if (Range.isInRanges(row.key.getToken(), ranges))
--                    {
--                        repairedSSTableWriter.append(row);
--                        repairedKeyCount++;
--                    }
--                    // otherwise save into the new 'non-repaired' table
--                    else
--                    {
--                        unRepairedSSTableWriter.append(row);
--                        unrepairedKeyCount++;
--                    }
++                    repairedSSTableWriter.append(row);
++                    repairedKeyCount++;
++                }
++                // otherwise save into the new 'non-repaired' table
++                else
++                {
++                    unRepairedSSTableWriter.append(row);
++                    unrepairedKeyCount++;
                  }
 -                // we have the same readers being rewritten by both writers, 
so we ask the first one NOT to close them
 -                // so that the second one can do so safely, without leaving 
us with references < 0 or any other ugliness
 -                repairedSSTableWriter.finish(false, repairedAt);
 -                
unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
 -                // add repaired table with a non-null timestamp field to be 
saved in SSTableMetadata#repairedAt
 -                
anticompactedSSTables.addAll(repairedSSTableWriter.finished());
 -                
anticompactedSSTables.addAll(unRepairedSSTableWriter.finished());
 -            }
 -            catch (Throwable e)
 -            {
 -                logger.error("Error anticompacting " + sstable, e);
 -                repairedSSTableWriter.abort();
 -                unRepairedSSTableWriter.abort();
              }
 +            // we have the same readers being rewritten by both writers, so 
we ask the first one NOT to close them
 +            // so that the second one can do so safely, without leaving us 
with references < 0 or any other ugliness
 +            repairedSSTableWriter.finish(false, repairedAt);
 +            
unRepairedSSTableWriter.finish(ActiveRepairService.UNREPAIRED_SSTABLE);
 +            // add repaired table with a non-null timestamp field to be saved 
in SSTableMetadata#repairedAt
 +            logger.debug("Repaired {} keys out of {} for {}/{} in {}", 
repairedKeyCount,
 +                                                                       
repairedKeyCount + unrepairedKeyCount,
 +                                                                       
cfs.keyspace.getName(),
 +                                                                       
cfs.getColumnFamilyName(),
 +                                                                       
anticompactionGroup);
 +            return repairedSSTableWriter.finished().size() + 
unRepairedSSTableWriter.finished().size();
          }
 -        String format = "Repaired {} keys of {} for {}/{}";
 -        logger.debug(format, repairedKeyCount, (repairedKeyCount + 
unrepairedKeyCount), cfs.keyspace, cfs.getColumnFamilyName());
 -        String format2 = "Anticompaction completed successfully, 
anticompacted from {} to {} sstable(s).";
 -        logger.info(format2, repairedSSTables.size(), 
anticompactedSSTables.size());
 -
 -        return anticompactedSSTables;
 +        catch (Throwable e)
 +        {
 +            logger.error("Error anticompacting " + anticompactionGroup, e);
 +            repairedSSTableWriter.abort();
 +            unRepairedSSTableWriter.abort();
 +        }
 +        return 0;
      }
  
      /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 4a22d0c,6217348..527f483
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@@ -20,7 -20,9 +20,8 @@@ package org.apache.cassandra.db.compact
  import java.io.File;
  import java.io.IOException;
  import java.util.Collection;
 -import java.util.Collections;
  import java.util.HashMap;
+ import java.util.Iterator;
  import java.util.List;
  import java.util.Map;
  import java.util.Set;
@@@ -43,8 -47,7 +44,7 @@@ import org.apache.cassandra.io.sstable.
  import org.apache.cassandra.io.sstable.SSTableWriter;
  import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
  import org.apache.cassandra.service.ActiveRepairService;
--import org.apache.cassandra.utils.CloseableIterator;
 +import org.apache.cassandra.utils.UUIDGen;
  
  public class CompactionTask extends AbstractCompactionTask
  {
@@@ -144,120 -137,117 +141,117 @@@
  
          long start = System.nanoTime();
          long totalKeysWritten = 0;
-         long estimatedTotalKeys = 
Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
-         long estimatedSSTables = Math.max(1, 
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
-         long keysPerSSTable = (long) Math.ceil((double) estimatedTotalKeys / 
estimatedSSTables);
-         logger.debug("Expected bloom filter size : {}", keysPerSSTable);
- 
-         // TODO: errors when creating the scanners can result in untidied 
resources
-         AbstractCompactionIterable ci = new 
CompactionIterable(compactionType, strategy.getScanners(actuallyCompact), 
controller);
-         CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
- 
-         // we can't preheat until the tracker has been set. This doesn't 
happen until we tell the cfs to
-         // replace the old entries.  Track entries to preheat here until then.
-         long minRepairedAt = getMinRepairedAt(actuallyCompact);
-         // we only need the age of the data that we're actually retaining
-         long maxAge = getMaxDataAge(actuallyCompact);
-         if (collector != null)
-             collector.beginCompaction(ci);
-         SSTableRewriter writer = new SSTableRewriter(cfs, sstables, maxAge, 
compactionType, offline);
-         try
+ 
+         try (CompactionController controller = 
getCompactionController(sstables);)
          {
-             if (!iter.hasNext())
-             {
-                 // don't mark compacted in the finally block, since if there 
_is_ nondeleted data,
-                 // we need to sync it (via closeAndOpen) first, so there is 
no period during which
-                 // a crash could cause data loss.
-                 cfs.markObsolete(sstables, compactionType);
-                 return;
-             }
  
-             writer.switchWriter(createCompactionWriter(sstableDirectory, 
keysPerSSTable, minRepairedAt));
-             while (iter.hasNext())
-             {
-                 if (ci.isStopRequested())
-                     throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+             Set<SSTableReader> actuallyCompact = Sets.difference(sstables, 
controller.getFullyExpiredSSTables());
+ 
+             long estimatedTotalKeys = 
Math.max(cfs.metadata.getMinIndexInterval(), 
SSTableReader.getApproximateKeyCount(actuallyCompact));
+             long estimatedSSTables = Math.max(1, 
SSTableReader.getTotalBytes(actuallyCompact) / strategy.getMaxSSTableBytes());
+             long keysPerSSTable = (long) Math.ceil((double) 
estimatedTotalKeys / estimatedSSTables);
+             logger.debug("Expected bloom filter size : {}", keysPerSSTable);
  
-                 AbstractCompactedRow row = iter.next();
-                 if (writer.append(row) != null)
+             try (AbstractCompactionStrategy.ScannerList scanners = 
strategy.getScanners(actuallyCompact))
+             {
+                 AbstractCompactionIterable ci = new 
CompactionIterable(compactionType, scanners.scanners, controller);
+                 Iterator<AbstractCompactedRow> iter = ci.iterator();
+ 
+                 // we can't preheat until the tracker has been set. This 
doesn't happen until we tell the cfs to
+                 // replace the old entries.  Track entries to preheat here 
until then.
+                 long minRepairedAt = getMinRepairedAt(actuallyCompact);
+                 // we only need the age of the data that we're actually 
retaining
+                 long maxAge = getMaxDataAge(actuallyCompact);
+                 if (collector != null)
+                     collector.beginCompaction(ci);
+                 SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 
maxAge, compactionType, offline);
+                 try
                  {
-                     totalKeysWritten++;
-                     if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+                     if (!iter.hasNext())
                      {
-                         
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                         // don't mark compacted in the finally block, since 
if there _is_ nondeleted data,
+                         // we need to sync it (via closeAndOpen) first, so 
there is no period during which
+                         // a crash could cause data loss.
+                         cfs.markObsolete(sstables, compactionType);
+                         return;
                      }
+ 
+                     
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                     while (iter.hasNext())
+                     {
+                         if (ci.isStopRequested())
+                             throw new 
CompactionInterruptedException(ci.getCompactionInfo());
+ 
+                         AbstractCompactedRow row = iter.next();
+                         if (writer.append(row) != null)
+                         {
+                             totalKeysWritten++;
+                             if 
(newSSTableSegmentThresholdReached(writer.currentWriter()))
+                             {
+                                 
writer.switchWriter(createCompactionWriter(sstableDirectory, keysPerSSTable, 
minRepairedAt));
+                             }
+                         }
+                     }
+ 
+                     // don't replace old sstables yet, as we need to mark the 
compaction finished in the system table
+                     writer.finish(false);
                  }
-             }
+                 catch (Throwable t)
+                 {
+                     writer.abort();
+                     throw t;
+                 }
+                 finally
+                 {
  
-             // don't replace old sstables yet, as we need to mark the 
compaction finished in the system table
-             writer.finish(false);
-         }
-         catch (Throwable t)
-         {
-             writer.abort();
-             throw t;
-         }
-         finally
-         {
-             controller.close();
+                     // point of no return -- the new sstables are live on 
disk; next we'll start deleting the old ones
+                     // (in replaceCompactedSSTables)
+                     if (taskId != null)
+                         SystemKeyspace.finishCompaction(taskId);
  
-             // point of no return -- the new sstables are live on disk; next 
we'll start deleting the old ones
-             // (in replaceCompactedSSTables)
-             if (taskId != null)
-                 SystemKeyspace.finishCompaction(taskId);
+                     if (collector != null)
+                         collector.finishCompaction(ci);
+                 }
  
-             if (collector != null)
-                 collector.finishCompaction(ci);
+                 Collection<SSTableReader> oldSStables = this.sstables;
+                 List<SSTableReader> newSStables = writer.finished();
+                 if (!offline)
+                     
cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, newSStables, 
compactionType);
+ 
+                 // log a bunch of statistics about the result and save to 
system table compaction_history
+                 long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() 
- start);
+                 long startsize = SSTableReader.getTotalBytes(oldSStables);
+                 long endsize = SSTableReader.getTotalBytes(newSStables);
+                 double ratio = (double) endsize / (double) startsize;
+ 
+                 StringBuilder newSSTableNames = new StringBuilder();
+                 for (SSTableReader reader : newSStables)
+                     
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
+ 
+                 double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
+                 long totalSourceRows = 0;
+                 long[] counts = ci.getMergedRowCounts();
+                 StringBuilder mergeSummary = new StringBuilder(counts.length 
* 10);
+                 Map<Integer, Long> mergedRows = new HashMap<>();
+                 for (int i = 0; i < counts.length; i++)
+                 {
+                     long count = counts[i];
+                     if (count == 0)
+                         continue;
+ 
+                     int rows = i + 1;
+                     totalSourceRows += rows * count;
+                     mergeSummary.append(String.format("%d:%d, ", rows, 
count));
+                     mergedRows.put(rows, count);
+                 }
  
-             try
-             {
-                 // We don't expect this to throw, but just in case, we do it 
after the cleanup above, to make sure
-                 // we don't end up with compaction information hanging around 
indefinitely in limbo.
-                 iter.close();
-             }
-             catch (IOException e)
-             {
-                 throw new RuntimeException(e);
+                 
SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), cfs.name, 
System.currentTimeMillis(), startsize, endsize, mergedRows);
 -                logger.info(String.format("Compacted %d sstables to [%s].  
%,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total partitions 
merged to %,d.  Partition merge counts were {%s}",
 -                                          oldSStables.size(), 
newSSTableNames.toString(), startsize, endsize, (int) (ratio * 100), dTime, 
mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
++                logger.info(String.format("Compacted (%s) %d sstables to [%s] 
to level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d 
total partitions merged to %,d.  Partition merge counts were {%s}",
++                                          taskIdLoggerMsg, 
oldSStables.size(), newSSTableNames.toString(), getLevel(), startsize, endsize, 
(int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, 
mergeSummary.toString()));
+                 logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
+                 logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: 
{}", totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - 
estimatedTotalKeys)/totalKeysWritten));
              }
          }
- 
-         Collection<SSTableReader> oldSStables = this.sstables;
-         List<SSTableReader> newSStables = writer.finished();
-         if (!offline)
-             cfs.getDataTracker().markCompactedSSTablesReplaced(oldSStables, 
newSStables, compactionType);
- 
-         // log a bunch of statistics about the result and save to system 
table compaction_history
-         long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-         long startsize = SSTableReader.getTotalBytes(oldSStables);
-         long endsize = SSTableReader.getTotalBytes(newSStables);
-         double ratio = (double) endsize / (double) startsize;
- 
-         StringBuilder newSSTableNames = new StringBuilder();
-         for (SSTableReader reader : newSStables)
-             
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
- 
-         double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
-         long totalSourceRows = 0;
-         long[] counts = ci.getMergedRowCounts();
-         StringBuilder mergeSummary = new StringBuilder(counts.length * 10);
-         Map<Integer, Long> mergedRows = new HashMap<>();
-         for (int i = 0; i < counts.length; i++)
-         {
-             long count = counts[i];
-             if (count == 0)
-                 continue;
- 
-             int rows = i + 1;
-             totalSourceRows += rows * count;
-             mergeSummary.append(String.format("%d:%d, ", rows, count));
-             mergedRows.put(rows, count);
-         }
- 
-         SystemKeyspace.updateCompactionHistory(cfs.keyspace.getName(), 
cfs.name, System.currentTimeMillis(), startsize, endsize, mergedRows);
-         logger.info(String.format("Compacted (%s) %d sstables to [%s] to 
level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total 
partitions merged to %,d.  Partition merge counts were {%s}",
-                                   taskIdLoggerMsg, oldSStables.size(), 
newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 
100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary.toString()));
-         logger.debug(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
-         logger.debug("Actual #keys: {}, Estimated #keys:{}, Err%: {}", 
totalKeysWritten, estimatedTotalKeys, ((double)(totalKeysWritten - 
estimatedTotalKeys)/totalKeysWritten));
      }
  
      private long getMinRepairedAt(Set<SSTableReader> actuallyCompact)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/src/java/org/apache/cassandra/db/compaction/Upgrader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0956a8a7/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------

Reply via email to