Fix partition count log message during compaction

Patch by Carl Yeksigian; reviewed by marcuse for CASSANDRA-12184


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9d90b4e2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9d90b4e2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9d90b4e2

Branch: refs/heads/trunk
Commit: 9d90b4e2da19d3dcc19842a3cb9f8be9091f4af0
Parents: bfa8c80
Author: Carl Yeksigian <c...@apache.org>
Authored: Wed Nov 2 12:11:32 2016 -0400
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Thu Nov 10 15:45:16 2016 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++++
 .../cassandra/db/compaction/CompactionTask.java | 22 +++++++++++++++-----
 2 files changed, 21 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d90b4e2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cc5b003..8f0e201 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,3 +1,7 @@
+3.0.11
+ * Fix partition count log during compaction (CASSANDRA-12184)
+
+
 3.0.10
  * Batch with multiple conditional updates for the same partition causes 
AssertionError (CASSANDRA-12867)
  * Make AbstractReplicationStrategy extendable from outside its package 
(CASSANDRA-12788)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9d90b4e2/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 7e4ed41..0c4e144 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -47,6 +47,17 @@ import org.apache.cassandra.utils.concurrent.Refs;
 
 public class CompactionTask extends AbstractCompactionTask
 {
+    private static class Summary
+    {
+        final String partitionMerge;
+        final long totalSourceRows;
+
+        public Summary(String partitionMerge, long totalSourceRows)
+        {
+            this.partitionMerge = partitionMerge;
+            this.totalSourceRows = totalSourceRows;
+        }
+    }
     protected static final Logger logger = 
LoggerFactory.getLogger(CompactionTask.class);
     protected final int gcBefore;
     protected final boolean offline;
@@ -213,10 +224,9 @@ public class CompactionTask extends AbstractCompactionTask
                 
newSSTableNames.append(reader.descriptor.baseFilename()).append(",");
 
             double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / 
((double) dTime / 1000) : 0;
-            long totalSourceRows = 0;
-            String mergeSummary = 
updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), 
mergedRowCounts, startsize, endsize);
+            Summary mergeSummary = 
updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), 
mergedRowCounts, startsize, endsize);
             logger.debug(String.format("Compacted (%s) %d sstables to [%s] to 
level=%d.  %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s.  %,d total 
partitions merged to %,d.  Partition merge counts were {%s}",
-                                      taskId, transaction.originals().size(), 
newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 
100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary));
+                                      taskId, transaction.originals().size(), 
newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 
100), dTime, mbps, mergeSummary.totalSourceRows, totalKeysWritten, 
mergeSummary.partitionMerge));
             logger.trace(String.format("CF Total Bytes Compacted: %,d", 
CompactionTask.addToTotalBytesCompacted(endsize)));
             logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", 
totalKeysWritten, estimatedKeys, ((double)(totalKeysWritten - 
estimatedKeys)/totalKeysWritten));
 
@@ -234,10 +244,11 @@ public class CompactionTask extends AbstractCompactionTask
         return new DefaultCompactionWriter(cfs, directories, transaction, 
nonExpiredSSTables, offline, keepOriginals);
     }
 
-    public static String updateCompactionHistory(String keyspaceName, String 
columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
+    public static Summary updateCompactionHistory(String keyspaceName, String 
columnFamilyName, long[] mergedRowCounts, long startSize, long endSize)
     {
         StringBuilder mergeSummary = new StringBuilder(mergedRowCounts.length 
* 10);
         Map<Integer, Long> mergedRows = new HashMap<>();
+        long totalSourceRows = 0;
         for (int i = 0; i < mergedRowCounts.length; i++)
         {
             long count = mergedRowCounts[i];
@@ -245,11 +256,12 @@ public class CompactionTask extends AbstractCompactionTask
                 continue;
 
             int rows = i + 1;
+            totalSourceRows += rows * count;
             mergeSummary.append(String.format("%d:%d, ", rows, count));
             mergedRows.put(rows, count);
         }
         SystemKeyspace.updateCompactionHistory(keyspaceName, columnFamilyName, 
System.currentTimeMillis(), startSize, endSize, mergedRows);
-        return mergeSummary.toString();
+        return new Summary(mergeSummary.toString(), totalSourceRows);
     }
 
     protected Directories getDirectories()

Reply via email to