Author: jbellis
Date: Sun Aug 7 01:51:44 2011
New Revision: 1154635
URL: http://svn.apache.org/viewvc?rev=1154635&view=rev
Log:
refactor CompactionIterator -> CompactionIterable
patch by jbellis; reviewed by slebresne for CASSANDRA-2901
Added:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
- copied, changed from r1154426,
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Removed:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Copied:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
(from r1154426,
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java)
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java?p2=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java&p1=cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java&r1=1154426&r2=1154635&rev=1154635&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterator.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionIterable.java
Sun Aug 7 01:51:44 2011
@@ -38,15 +38,16 @@ import org.apache.cassandra.service.Stor
import org.apache.cassandra.utils.CloseableIterator;
import org.apache.cassandra.utils.MergeIterator;
-public class CompactionIterator
-implements CloseableIterator<AbstractCompactedRow>, CompactionInfo.Holder
+public class CompactionIterable
+implements Iterable<AbstractCompactedRow>, CompactionInfo.Holder
{
- private static Logger logger =
LoggerFactory.getLogger(CompactionIterator.class);
+ private static Logger logger =
LoggerFactory.getLogger(CompactionIterable.class);
public static final int FILE_BUFFER_SIZE = 1024 * 1024;
- private final MergeIterator<IColumnIterator, AbstractCompactedRow> source;
+ private MergeIterator<IColumnIterator, AbstractCompactedRow> source;
protected final CompactionType type;
+ private final List<SSTableScanner> scanners;
protected final CompactionController controller;
private long totalBytes;
@@ -61,16 +62,16 @@ implements CloseableIterator<AbstractCom
// current target bytes to compact per millisecond
private int targetBytesPerMS = -1;
- public CompactionIterator(CompactionType type, Iterable<SSTableReader>
sstables, CompactionController controller) throws IOException
+ public CompactionIterable(CompactionType type, Iterable<SSTableReader>
sstables, CompactionController controller) throws IOException
{
this(type, getScanners(sstables), controller);
}
- protected CompactionIterator(CompactionType type, List<SSTableScanner>
scanners, CompactionController controller)
+ protected CompactionIterable(CompactionType type, List<SSTableScanner>
scanners, CompactionController controller)
{
this.type = type;
+ this.scanners = scanners;
this.controller = controller;
- this.source = MergeIterator.get(scanners, ICOMP, new Reducer());
row = 0;
totalBytes = bytesRead = 0;
for (SSTableScanner scanner : scanners)
@@ -94,20 +95,9 @@ implements CloseableIterator<AbstractCom
totalBytes);
}
-
- public boolean hasNext()
- {
- return source.hasNext();
- }
-
- public AbstractCompactedRow next()
- {
- return source.next();
- }
-
- public void remove()
+ public CloseableIterator<AbstractCompactedRow> iterator()
{
- throw new UnsupportedOperationException();
+ return MergeIterator.get(scanners, ICOMP, new Reducer());
}
private void throttle()
@@ -151,16 +141,6 @@ implements CloseableIterator<AbstractCom
timeAtLastDelay = System.currentTimeMillis();
}
- public void close() throws IOException
- {
- source.close();
- }
-
- protected Iterable<SSTableScanner> getScanners()
- {
- return (Iterable<SSTableScanner>)(source.iterators());
- }
-
public String toString()
{
return this.getCompactionInfo().toString();
@@ -201,7 +181,7 @@ implements CloseableIterator<AbstractCom
if ((row++ % 1000) == 0)
{
bytesRead = 0;
- for (SSTableScanner scanner : getScanners())
+ for (SSTableScanner scanner : scanners)
{
bytesRead += scanner.getFilePointer();
}
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Sun Aug 7 01:51:44 2011
@@ -687,7 +687,7 @@ public class CompactionManager implement
if (compactionFileLocation == null)
throw new IOException("disk full");
- SSTableScanner scanner =
sstable.getDirectScanner(CompactionIterator.FILE_BUFFER_SIZE);
+ SSTableScanner scanner =
sstable.getDirectScanner(CompactionIterable.FILE_BUFFER_SIZE);
SortedSet<ByteBuffer> indexedColumns = cfs.getIndexedColumns();
CleanupInfo ci = new CleanupInfo(sstable, scanner);
executor.beginCompaction(ci);
@@ -795,11 +795,12 @@ public class CompactionManager implement
}
Collection<SSTableReader> sstables =
cfs.markCurrentSSTablesReferenced();
- CompactionIterator ci = new ValidationCompactionIterator(cfs,
sstables, validator.request.range);
+ CompactionIterable ci = new ValidationCompactionIterable(cfs,
sstables, validator.request.range);
+ CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
validationExecutor.beginCompaction(ci);
try
{
- Iterator<AbstractCompactedRow> nni = Iterators.filter(ci,
Predicates.notNull());
+ Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,
Predicates.notNull());
// validate the CF as we iterate over it
validator.prepare(cfs);
@@ -813,7 +814,7 @@ public class CompactionManager implement
finally
{
SSTableReader.releaseReferences(sstables);
- ci.close();
+ iter.close();
validationExecutor.finishCompaction(ci);
}
}
@@ -922,9 +923,9 @@ public class CompactionManager implement
: (int) (System.currentTimeMillis() / 1000) -
cfs.metadata.getGcGraceSeconds();
}
- private static class ValidationCompactionIterator extends
CompactionIterator
+ private static class ValidationCompactionIterable extends
CompactionIterable
{
- public ValidationCompactionIterator(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Range range) throws IOException
+ public ValidationCompactionIterable(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Range range) throws IOException
{
super(CompactionType.VALIDATION,
getScanners(sstables, range),
Modified:
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
---
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
(original)
+++
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Sun Aug 7 01:51:44 2011
@@ -41,6 +41,7 @@ import org.apache.cassandra.db.compactio
import org.apache.cassandra.io.sstable.SSTable;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.sstable.SSTableWriter;
+import org.apache.cassandra.utils.CloseableIterator;
public class CompactionTask extends AbstractCompactionTask
{
@@ -129,8 +130,9 @@ public class CompactionTask extends Abst
SSTableWriter writer = null;
final SSTableReader ssTable;
- CompactionIterator ci = new CompactionIterator(type, toCompact,
controller); // retain a handle so we can call close()
- Iterator<AbstractCompactedRow> nni = Iterators.filter(ci,
Predicates.notNull());
+ CompactionIterable ci = new CompactionIterable(type, toCompact,
controller); // retain a handle so we can call close()
+ CloseableIterator<AbstractCompactedRow> iter = ci.iterator();
+ Iterator<AbstractCompactedRow> nni = Iterators.filter(iter,
Predicates.notNull());
Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
if (collector != null)
@@ -169,7 +171,7 @@ public class CompactionTask extends Abst
}
finally
{
- ci.close();
+ iter.close();
if (collector != null)
collector.finishCompaction(ci);
if (writer != null)
Modified:
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
URL:
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java?rev=1154635&r1=1154634&r2=1154635&view=diff
==============================================================================
---
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
(original)
+++
cassandra/trunk/test/unit/org/apache/cassandra/io/LazilyCompactedRowTest.java
Sun Aug 7 01:51:44 2011
@@ -28,6 +28,7 @@ import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
@@ -58,8 +59,8 @@ public class LazilyCompactedRowTest exte
private void assertBytes(ColumnFamilyStore cfs, int gcBefore) throws
IOException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionIterator ci1 = new
CompactionIterator(CompactionType.UNKNOWN, sstables, new
PreCompactingController(cfs, sstables, gcBefore, false));
- CompactionIterator ci2 = new
CompactionIterator(CompactionType.UNKNOWN, sstables, new
LazilyCompactingController(cfs, sstables, gcBefore, false));
+ Iterator<AbstractCompactedRow> ci1 = new
CompactionIterable(CompactionType.UNKNOWN, sstables, new
PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
+ Iterator<AbstractCompactedRow> ci2 = new
CompactionIterable(CompactionType.UNKNOWN, sstables, new
LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
while (true)
{
@@ -133,8 +134,8 @@ public class LazilyCompactedRowTest exte
private void assertDigest(ColumnFamilyStore cfs, int gcBefore) throws
IOException, NoSuchAlgorithmException
{
Collection<SSTableReader> sstables = cfs.getSSTables();
- CompactionIterator ci1 = new
CompactionIterator(CompactionType.UNKNOWN, sstables, new
PreCompactingController(cfs, sstables, gcBefore, false));
- CompactionIterator ci2 = new
CompactionIterator(CompactionType.UNKNOWN, sstables, new
LazilyCompactingController(cfs, sstables, gcBefore, false));
+ Iterator<AbstractCompactedRow> ci1 = new
CompactionIterable(CompactionType.UNKNOWN, sstables, new
PreCompactingController(cfs, sstables, gcBefore, false)).iterator();
+ Iterator<AbstractCompactedRow> ci2 = new
CompactionIterable(CompactionType.UNKNOWN, sstables, new
LazilyCompactingController(cfs, sstables, gcBefore, false)).iterator();
while (true)
{