Author: jbellis
Date: Wed Dec 1 22:41:23 2010
New Revision: 1041200
URL: http://svn.apache.org/viewvc?rev=1041200&view=rev
Log:
avoid opening readers on anticompacted to-be-streamed temporary files
patch by thobbs; reviewed by mdennis and jbellis for CASSANDRA-1752
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 1 22:41:23 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
return executor.submit(runnable);
}
- public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
+ public Future<List<String>> submitAnticompaction(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
{
- Callable<List<SSTableReader>> callable = new
Callable<List<SSTableReader>>()
+ Callable<List<String>> callable = new Callable<List<String>>()
{
- public List<SSTableReader> call() throws IOException
+ public List<String> call() throws IOException
{
return doAntiCompaction(cfStore, cfStore.getSSTables(),
ranges, target);
}
@@ -320,18 +320,7 @@ public class CompactionManager implement
return sstables.size();
}
- /**
- * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
- * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
- *
- * @param cfs
- * @param sstables
- * @param ranges
- * @param target
- * @return
- * @throws java.io.IOException
- */
- private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
throws IOException
{
Table table = cfs.getTable();
@@ -348,10 +337,9 @@ public class CompactionManager implement
// compacting for streaming: send to subdirectory
compactionFileLocation = compactionFileLocation + File.separator +
DatabaseDescriptor.STREAMING_SUBDIR;
}
- List<SSTableReader> results = new ArrayList<SSTableReader>();
+ long totalKeysWritten = 0;
long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
int expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
if (logger.isDebugEnabled())
@@ -364,11 +352,6 @@ public class CompactionManager implement
try
{
- if (!nni.hasNext())
- {
- return results;
- }
-
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
@@ -379,22 +362,61 @@ public class CompactionManager implement
writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, StorageService.getPartitioner());
}
writer.append(row.key, row.buffer);
- totalkeysWritten++;
+ totalKeysWritten++;
}
}
finally
{
ci.close();
}
+ if (writer != null) {
+ List<String> filenames = writer.getAllFilenames();
+ String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
+ long dTime = System.currentTimeMillis() - startTime;
+ long length = new File(filenames.get(filenames.size()
-1)).length(); // Data file is last in the list
+ logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
+ }
+ return writer;
+ }
+ /**
+ * This function is used to do the anti compaction process. It spits out
a file which has keys
+ * that belong to a given range. If the target is not specified it spits
out the file as a compacted file with the
+ * unnecessary ranges wiped out.
+ *
+ * @param cfs
+ * @param sstables
+ * @param ranges
+ * @param target
+ * @return
+ * @throws java.io.IOException
+ */
+ private List<String> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ throws IOException
+ {
+ List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
+ SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
if (writer != null)
{
- results.add(writer.closeAndOpenReader());
- String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
- long dTime = System.currentTimeMillis() - startTime;
- logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten,
dTime));
+ writer.close();
+ filenames = writer.getAllFilenames();
}
+ return filenames;
+ }
+ /**
+ * Like doAntiCompaction(), but returns an List of SSTableReaders instead
of a list of filenames.
+ * @throws java.io.IOException
+ */
+ private List<SSTableReader>
doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
+ throws IOException
+ {
+ List<SSTableReader> results = new ArrayList<SSTableReader>(1);
+ SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
+ if (writer != null)
+ {
+ results.add(writer.closeAndOpenReader());
+ }
return results;
}
@@ -407,7 +429,7 @@ public class CompactionManager implement
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
+ List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs,
originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name),
null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
Wed Dec 1 22:41:23 2010
@@ -282,9 +282,9 @@ public class Table
* do a complete compaction since we can figure out based on the ranges
* whether the files need to be split.
*/
- public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
+ public List<String> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
- List<SSTableReader> allResults = new ArrayList<SSTableReader>();
+ List<String> allResults = new ArrayList<String>();
Set<String> columnFamilies = tableMetadata.getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
Wed Dec 1 22:41:23 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
/**
* Renames temporary SSTable files to valid data, index, and bloom filter
files
*/
- public SSTableReader closeAndOpenReader() throws IOException
+ public void close() throws IOException
{
// bloom filter
FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,6 +136,14 @@ public class SSTableWriter extends SSTab
path = rename(path); // important to do this last since index & filter
file names are derived from it
indexSummary.complete();
+ }
+
+ /**
+ * Renames temporary SSTable files to valid data, index, and bloom filter
files and returns an SSTableReader
+ */
+ public SSTableReader closeAndOpenReader() throws IOException
+ {
+ this.close();
return new SSTableReader(path, partitioner, indexSummary, bf);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Dec 1 22:41:23 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
try
{
List<Range> ranges = new ArrayList<Range>(differences);
- final List<SSTableReader> sstables =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+ final List<String> filenames =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
Future f =
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- StreamOut.transferSSTables(remote, sstables, cf.left);
+ StreamOut.transferSSTables(remote, filenames, cf.left);
StreamOutManager.remove(remote);
}
});
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Wed Dec 1 22:41:23 2010
@@ -112,19 +112,16 @@ public class StreamOut
* Transfers a group of sstables from a single table to the target endpoint
* and then marks them as ready for local deletion.
*/
- public static void transferSSTables(InetAddress target,
List<SSTableReader> sstables, String table) throws IOException
+ public static void transferSSTables(InetAddress target, List<String>
filenames, String table) throws IOException
{
- PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK *
sstables.size()];
+ PendingFile[] pendingFiles = new PendingFile[filenames.size()];
int i = 0;
- for (SSTableReader sstable : sstables)
+ for (String filename : filenames)
{
- for (String filename : sstable.getAllFilenames())
- {
- File file = new File(filename);
- pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
- }
+ File file = new File(filename);
+ pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
}
- logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
+ logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(pendingFiles);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);
Modified:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
(original)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
Wed Dec 1 22:41:23 2010
@@ -127,7 +127,7 @@ public class ColumnFamilyStoreTest exten
Range r = new Range(partitioner.getToken("0"),
partitioner.getToken("zzzzzzz"));
ranges.add(r);
- List<SSTableReader> fileList =
CompactionManager.instance.submitAnticompaction(store, ranges,
InetAddress.getByName("127.0.0.1")).get();
+ List<String> fileList =
CompactionManager.instance.submitAnticompaction(store, ranges,
InetAddress.getByName("127.0.0.1")).get();
assert fileList.size() >= 1;
}
Modified:
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java?rev=1041200&r1=1041199&r2=1041200&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
(original)
+++
cassandra/branches/cassandra-0.6/test/unit/org/apache/cassandra/io/StreamingTest.java
Wed Dec 1 22:41:23 2010
@@ -49,9 +49,10 @@ public class StreamingTest extends Clean
SSTableReader sstable = SSTableUtils.writeSSTable(content);
String tablename = sstable.getTableName();
String cfname = sstable.getColumnFamilyName();
+ List<String> filenames = sstable.getAllFilenames();
// transfer
- StreamOut.transferSSTables(LOCAL, Arrays.asList(sstable), tablename);
+ StreamOut.transferSSTables(LOCAL, filenames, tablename);
// confirm that the SSTable was transferred and registered
ColumnFamilyStore cfstore =
Table.open(tablename).getColumnFamilyStore(cfname);