Author: jbellis
Date: Wed Dec 1 18:00:37 2010
New Revision: 1041108
URL: http://svn.apache.org/viewvc?rev=1041108&view=rev
Log:
revert last to fix test failures
Modified:
cassandra/branches/cassandra-0.6/CHANGES.txt
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Dec 1 18:00:37 2010
@@ -10,8 +10,6 @@
* detect and warn when obsolete version of JNA is present (CASSANDRA-1770)
* fix live-column-count of slice ranges including tombstoned supercolumn
with live subcolumn (CASSANDRA-1591)
- * avoid opening readers on anticompacted to-be-streamed temporary
- files (CASSANDRA-1752)
0.6.8
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
Wed Dec 1 18:00:37 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
return executor.submit(runnable);
}
- public Future<List<String>> submitAnticompaction(final ColumnFamilyStore
cfStore, final Collection<Range> ranges, final InetAddress target)
+ public Future<List<SSTableReader>> submitAnticompaction(final
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress
target)
{
- Callable<List<String>> callable = new Callable<List<String>>()
+ Callable<List<SSTableReader>> callable = new
Callable<List<SSTableReader>>()
{
- public List<String> call() throws IOException
+ public List<SSTableReader> call() throws IOException
{
return doAntiCompaction(cfStore, cfStore.getSSTables(),
ranges, target);
}
@@ -320,7 +320,18 @@ public class CompactionManager implement
return sstables.size();
}
- private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
+ /**
+ * This function is used to do the anti compaction process , it spits out
the file which has keys that belong to a given range
+ * If the target is not specified it spits out the file as a compacted
file with the unecessary ranges wiped out.
+ *
+ * @param cfs
+ * @param sstables
+ * @param ranges
+ * @param target
+ * @return
+ * @throws java.io.IOException
+ */
+ private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
throws IOException
{
Table table = cfs.getTable();
@@ -337,9 +348,10 @@ public class CompactionManager implement
// compacting for streaming: send to subdirectory
compactionFileLocation = compactionFileLocation + File.separator +
DatabaseDescriptor.STREAMING_SUBDIR;
}
+ List<SSTableReader> results = new ArrayList<SSTableReader>();
long startTime = System.currentTimeMillis();
- long totalKeysWritten = 0;
+ long totalkeysWritten = 0;
int expectedBloomFilterSize =
Math.max(DatabaseDescriptor.getIndexInterval(),
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
if (logger.isDebugEnabled())
@@ -352,6 +364,11 @@ public class CompactionManager implement
try
{
+ if (!nni.hasNext())
+ {
+ return results;
+ }
+
while (nni.hasNext())
{
CompactionIterator.CompactedRow row = nni.next();
@@ -362,7 +379,7 @@ public class CompactionManager implement
writer = new SSTableWriter(newFilename,
expectedBloomFilterSize, StorageService.getPartitioner());
}
writer.append(row.key, row.buffer);
- totalKeysWritten++;
+ totalkeysWritten++;
}
}
finally
@@ -372,53 +389,12 @@ public class CompactionManager implement
if (writer != null)
{
+ results.add(writer.closeAndOpenReader());
String format = "AntiCompacted to %s. %d/%d bytes for %d keys.
Time: %dms.";
long dTime = System.currentTimeMillis() - startTime;
- List<String> filenames = writer.getAllFilenames();
- long length = new File(filenames.get(filenames.size()
-1)).length(); // Data file is last in the list
- logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
- }
- return writer;
- }
-
- /**
- * This function is used to do the anti compaction process. It spits out
a file which has keys
- * that belong to a given range. If the target is not specified it spits
out the file as a compacted file with the
- * unnecessary ranges wiped out.
- *
- * @param cfs
- * @param sstables
- * @param ranges
- * @param target
- * @return
- * @throws java.io.IOException
- */
- private List<String> doAntiCompaction(ColumnFamilyStore cfs,
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress
target)
- throws IOException
- {
- List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
- SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
- if (writer != null)
- {
- writer.close();
- filenames = writer.getAllFilenames();
+ logger.info(String.format(format, writer.getFilename(),
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten,
dTime));
}
- return filenames;
- }
- /**
- * Like doAntiCompaction(), but returns an List of SSTableReaders instead
of a list of filenames.
- * @throws java.io.IOException
- */
- private List<SSTableReader>
doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader>
sstables, Collection<Range> ranges, InetAddress target)
- throws IOException
- {
- List<SSTableReader> results = new ArrayList<SSTableReader>(1);
- SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges,
target);
- if (writer != null)
- {
- results.add(writer.closeAndOpenReader());
- }
return results;
}
@@ -431,7 +407,7 @@ public class CompactionManager implement
private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
{
Collection<SSTableReader> originalSSTables = cfs.getSSTables();
- List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs,
originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name),
null);
+ List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables,
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
if (!sstables.isEmpty())
{
cfs.replaceCompactedSSTables(originalSSTables, sstables);
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
Wed Dec 1 18:00:37 2010
@@ -282,9 +282,9 @@ public class Table
* do a complete compaction since we can figure out based on the ranges
* whether the files need to be split.
*/
- public List<String> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
+ public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges,
InetAddress target)
{
- List<String> allResults = new ArrayList<String>();
+ List<SSTableReader> allResults = new ArrayList<SSTableReader>();
Set<String> columnFamilies = tableMetadata.getColumnFamilies();
for ( String columnFamily : columnFamilies )
{
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
Wed Dec 1 18:00:37 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
/**
* Renames temporary SSTable files to valid data, index, and bloom filter
files
*/
- public void close() throws IOException
+ public SSTableReader closeAndOpenReader() throws IOException
{
// bloom filter
FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,14 +136,6 @@ public class SSTableWriter extends SSTab
path = rename(path); // important to do this last since index & filter
file names are derived from it
indexSummary.complete();
- }
-
- /**
- * Renames temporary SSTable files to valid data, index, and bloom filter
files and returns an SSTableReader
- */
- public SSTableReader closeAndOpenReader() throws IOException
- {
- this.close();
return new SSTableReader(path, partitioner, indexSummary, bf);
}
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
Wed Dec 1 18:00:37 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
try
{
List<Range> ranges = new ArrayList<Range>(differences);
- final List<String> filenames =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+ final List<SSTableReader> sstables =
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
Future f =
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable()
{
protected void runMayThrow() throws Exception
{
- StreamOut.transferSSTables(remote, filenames, cf.left);
+ StreamOut.transferSSTables(remote, sstables, cf.left);
StreamOutManager.remove(remote);
}
});
Modified:
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL:
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
---
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
(original)
+++
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
Wed Dec 1 18:00:37 2010
@@ -112,16 +112,19 @@ public class StreamOut
* Transfers a group of sstables from a single table to the target endpoint
* and then marks them as ready for local deletion.
*/
- public static void transferSSTables(InetAddress target, List<String>
filenames, String table) throws IOException
+ public static void transferSSTables(InetAddress target,
List<SSTableReader> sstables, String table) throws IOException
{
- PendingFile[] pendingFiles = new PendingFile[filenames.size()];
+ PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK *
sstables.size()];
int i = 0;
- for (String filename : filenames)
+ for (SSTableReader sstable : sstables)
{
- File file = new File(filename);
- pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
+ for (String filename : sstable.getAllFilenames())
+ {
+ File file = new File(filename);
+ pendingFiles[i++] = new PendingFile(file.getAbsolutePath(),
file.length(), table);
+ }
}
- logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables."));
+ logger.info("Stream context metadata " +
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
StreamOutManager.get(target).addFilesToStream(pendingFiles);
StreamInitiateMessage biMessage = new
StreamInitiateMessage(pendingFiles);
Message message =
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);