Author: jbellis
Date: Wed Dec 1 17:50:06 2010
New Revision: 1041105
URL: http://svn.apache.org/viewvc?rev=1041105&view=rev
Log:
avoid opening readers on anticompacted to-be-streamed temporary files
patch by thobbs; reviewed by mdennis and jbellis for CASSANDRA-1752
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Dec 1 17:50:06 2010
@@ -10,6 +10,8 @@
* detect and warn when obsolete version of JNA is present (CASSANDRA-1770)
* fix live-column-count of slice ranges including tombstoned supercolumn
with live subcolumn (CASSANDRA-1591)
+ * avoid opening readers on anticompacted to-be-streamed temporary
+ files (CASSANDRA-1752)
0.6.8
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 1 17:50:06 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
return executor.submit(runnable);
}
- public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
+ public Future<List<String>> submitAnticompaction(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
{
- Callable<List<SSTableReader>> callable = new
Callable<List<SSTableReader>>()
+ Callable<List<String>> callable = new Callable<List<String>>()
{
- public List<SSTableReader> call() throws IOException
+ public List<String> call() throws IOException
{
return doAntiCompaction(cfStore, cfStore.getSSTables(),
ranges, target);
}
@@ -320,18 +320,7 @@ public class CompactionManager implement
return sstables.size();
}
- /**
- * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
- * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
- *
- * @param cfs
- * @param sstables
- * @param ranges
- * @param target
- * @return
- * @throws java.io.IOException
- */
- private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
throws IOException
{
Table table = cfs.getTable();
@@ -348,10 +337,9 @@ public class CompactionManager implement
// compacting for streaming: send to subdirectory
compactionFileLocation = compactionFileLocation + File.separator +
DatabaseDescriptor.STREAMING_SUBDIR;
}
- List<SSTableReader> results = new ArrayList<SSTableReader>();
long startTime = System.currentTimeMillis();
- long totalkeysWritten = 0;
+ long totalKeysWritten = 0;
int expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
if (logger.isDebugEnabled())
@@ -364,11 +352,6 @@ public class CompactionManager implement
try
{
- if (!nni.hasNext())
- {
- return results;
- }
-
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
@@ -379,7 +362,7 @@ public class CompactionManager implement
writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, StorageService.getPartitioner());
}
writer.append(row.key, row.buffer);
- totalkeysWritten++;
+ totalKeysWritten++;
}
}
finally
@@ -389,12 +372,53 @@ public class CompactionManager implement
if (writer != null)
{
- results.add(writer.closeAndOpenReader());
String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
- logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten,
dTime));
+ List<String> filenames = writer.getAllFilenames();
+ long length = new File(filenames.get(filenames.size()
-1)).length(); // Data file is last in the list
+ logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
+ }
+ return writer;
+ }
+
+ /**
+ * This function is used to do the anti compaction process. It spits out
a file which has keys
+ * that belong to a given range. If the target is not specified it spits
out the file as a compacted file with the
+ * unnecessary ranges wiped out.
+ *
+ * @param cfs
+ * @param sstables
+ * @param ranges
+ * @param target
+ * @return
+ * @throws java.io.IOException
+ */
+ private List<String> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ throws IOException
+ {
+ List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
+ SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
+ if (writer != null)
+ {
+ writer.close();
+ filenames = writer.getAllFilenames();
}
+ return filenames;
+ }
+ /**
+ * Like doAntiCompaction(), but returns an List of SSTableReaders instead
of a list of filenames.
+ * @throws java.io.IOException
+ */
+ private List<SSTableReader>
doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
+ throws IOException
+ {
+ List<SSTableReader> results = new ArrayList<SSTableReader>(1);
+ SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
+ if (writer != null)
+ {
+ results.add(writer.closeAndOpenReader());
+ }
return results;
}
@@ -407,7 +431,7 @@ public class CompactionManager implement
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
+ List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs,
originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name),
null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
Wed Dec 1 17:50:06 2010
@@ -282,9 +282,9 @@ public class Table
* do a complete compaction since we can figure out based on the ranges
* whether the files need to be split.
*/
- public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
+ public List<String> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
- List<SSTableReader> allResults = new ArrayList<SSTableReader>();
+ List<String> allResults = new ArrayList<String>();
Set<String> columnFamilies = tableMetadata.getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
Wed Dec 1 17:50:06 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
/**
* Renames temporary SSTable files to valid data, index, and bloom filter
files
*/
- public SSTableReader closeAndOpenReader() throws IOException
+ public void close() throws IOException
{
// bloom filter
FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,6 +136,14 @@ public class SSTableWriter extends SSTab
path = rename(path); // important to do this last since index & filter
file names are derived from it
indexSummary.complete();
+ }
+
+ /**
+ * Renames temporary SSTable files to valid data, index, and bloom filter
files and returns an SSTableReader
+ */
+ public SSTableReader closeAndOpenReader() throws IOException
+ {
+ this.close();
return new SSTableReader(path, partitioner, indexSummary, bf);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Dec 1 17:50:06 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
try
{
List<Range> ranges = new ArrayList<Range>(differences);
- final List<SSTableReader> sstables =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+ final List<String> filenames =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
Future f =
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- StreamOut.transferSSTables(remote, sstables, cf.left);
+ StreamOut.transferSSTables(remote, filenames, cf.left);
StreamOutManager.remove(remote);
}
});
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041105&r1=1041104&r2=1041105&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Wed Dec 1 17:50:06 2010
@@ -112,19 +112,16 @@ public class StreamOut
* Transfers a group of sstables from a single table to the target endpoint
* and then marks them as ready for local deletion.
*/
- public static void transferSSTables(InetAddress target,
List<SSTableReader> sstables, String table) throws IOException
+ public static void transferSSTables(InetAddress target, List<String>
filenames, String table) throws IOException
{
- PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK *
sstables.size()];
+ PendingFile[] pendingFiles = new PendingFile[filenames.size()];
int i = 0;
- for (SSTableReader sstable : sstables)
+ for (String filename : filenames)
{
- for (String filename : sstable.getAllFilenames())
- {
- File file = new File(filename);
- pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
- }
+ File file = new File(filename);
+ pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
}
- logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
+ logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(pendingFiles);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);