Updated Branches:
  refs/heads/trunk d72e9381f -> effdb08b3

Improve LeveledScanner work estimation
patch by Marcus Eriksson; reviewed by jbellis for CASSANDRA-5250


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/effdb08b
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/effdb08b
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/effdb08b

Branch: refs/heads/trunk
Commit: effdb08b341e7c3799a50983c6086d7f4bbaf120
Parents: d72e938
Author: Jonathan Ellis <[email protected]>
Authored: Thu Mar 21 10:06:41 2013 -0400
Committer: Jonathan Ellis <[email protected]>
Committed: Thu Mar 21 10:06:41 2013 -0400

----------------------------------------------------------------------
 CHANGES.txt                                        |    1 +
 .../db/compaction/LeveledCompactionStrategy.java   |   40 ++++++++++--
 .../io/sstable/SSTableBoundedScanner.java          |    5 +-
 .../apache/cassandra/io/sstable/SSTableReader.java |   49 +--------------
 4 files changed, 38 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0f9c4f1..b7b0e42 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.0
+ * Improve LeveledScanner work estimation (CASSANDRA-5250)
  * Replace compaction lock with runWithCompactionsDisabled (CASSANDRA-3430)
  * Change Message IDs to ints (CASSANDRA-5307)
  * Move sstable level information into the Stats component, removing the

http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java 
b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
index ffe45ad..d081542 100644
--- a/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
+++ b/src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
@@ -34,7 +34,6 @@ import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.sstable.SSTable;
 import org.apache.cassandra.io.sstable.SSTableReader;
-import org.apache.cassandra.io.sstable.SSTableScanner;
 import org.apache.cassandra.notifications.INotification;
 import org.apache.cassandra.notifications.INotificationConsumer;
 import org.apache.cassandra.notifications.SSTableAddedNotification;
@@ -172,7 +171,9 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
             else
             {
                 // Create a LeveledScanner that only opens one sstable at a 
time, in sorted order
-                scanners.add(new LeveledScanner(byLevel.get(level), range));
+                List<SSTableReader> intersecting = 
LeveledScanner.intersecting(byLevel.get(level), range);
+                if (!intersecting.isEmpty())
+                    scanners.add(new LeveledScanner(intersecting, range));
             }
         }
 
@@ -194,19 +195,46 @@ public class LeveledCompactionStrategy extends 
AbstractCompactionStrategy implem
         public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> 
range)
         {
             this.range = range;
-            this.sstables = new ArrayList<SSTableReader>(sstables);
+
+            // add only sstables that intersect our range, and estimate how 
much data that involves
+            this.sstables = new ArrayList<SSTableReader>(sstables.size());
+            long length = 0;
+            for (SSTableReader sstable : sstables)
+            {
+                this.sstables.add(sstable);
+                long estimatedKeys = sstable.estimatedKeys();
+                double estKeysInRangeRatio = 1.0;
+
+                if (estimatedKeys > 0 && range != null)
+                    estKeysInRangeRatio = ((double) 
sstable.estimatedKeysForRanges(Collections.singleton(range))) / estimatedKeys;
+
+                length += sstable.uncompressedLength() * estKeysInRangeRatio;
+            }
+
+            totalLength = length;
             Collections.sort(this.sstables, SSTable.sstableComparator);
             sstableIterator = this.sstables.iterator();
+            assert sstableIterator.hasNext(); // caller should check 
intersecting first
             currentScanner = sstableIterator.next().getDirectScanner(range);
+        }
 
-            long length = 0;
+        public static List<SSTableReader> 
intersecting(Collection<SSTableReader> sstables, Range<Token> range)
+        {
+            ArrayList<SSTableReader> filtered = new ArrayList<SSTableReader>();
             for (SSTableReader sstable : sstables)
-                length += sstable.uncompressedLength();
-            totalLength = length;
+            {
+                Range<Token> sstableRange = new 
Range<Token>(sstable.first.getToken(), sstable.last.getToken(), 
sstable.partitioner);
+                if (range == null || sstableRange.intersects(range))
+                    filtered.add(sstable);
+            }
+            return filtered;
         }
 
         protected OnDiskAtomIterator computeNext()
         {
+            if (currentScanner == null)
+                return endOfData();
+
             try
             {
                 while (true)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
index a571901..0e31896 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
@@ -35,11 +35,10 @@ public class SSTableBoundedScanner extends SSTableScanner
     private final Iterator<Pair<Long, Long>> rangeIterator;
     private Pair<Long, Long> currentRange;
 
-    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, 
Iterator<Pair<Long, Long>> rangeIterator)
+    SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, 
Range<Token> range)
     {
         super(sstable, skipCache);
-        this.rangeIterator = rangeIterator;
-        assert rangeIterator.hasNext(); // use EmptyCompactionScanner otherwise
+        this.rangeIterator = 
sstable.getPositionsForRanges(Collections.singletonList(range)).iterator();
         currentRange = rangeIterator.next();
         dfile.seek(currentRange.left);
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/effdb08b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index e2ef70c..bbef4ec 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -973,10 +973,7 @@ public class SSTableReader extends SSTable
         if (range == null)
             return getDirectScanner();
 
-        Iterator<Pair<Long, Long>> rangeIterator = 
getPositionsForRanges(Collections.singletonList(range)).iterator();
-        return rangeIterator.hasNext()
-               ? new SSTableBoundedScanner(this, true, rangeIterator)
-               : new EmptyCompactionScanner(getFilename());
+        return new SSTableBoundedScanner(this, true, range);
     }
 
     public FileDataInput getFileDataInput(long position)
@@ -1210,48 +1207,4 @@ public class SSTableReader extends SSTable
             sstable.releaseReference();
         }
     }
-
-    private static class EmptyCompactionScanner implements ICompactionScanner
-    {
-        private final String filename;
-
-        private EmptyCompactionScanner(String filename)
-        {
-            this.filename = filename;
-        }
-
-        public long getLengthInBytes()
-        {
-            return 0;
-        }
-
-        public long getCurrentPosition()
-        {
-            return 0;
-        }
-
-        public String getBackingFiles()
-        {
-            return filename;
-        }
-
-        public void close()
-        {
-        }
-
-        public boolean hasNext()
-        {
-            return false;
-        }
-
-        public OnDiskAtomIterator next()
-        {
-            throw new IndexOutOfBoundsException();
-        }
-
-        public void remove()
-        {
-            throw new UnsupportedOperationException();
-        }
-    }
 }

Reply via email to