Author: jbellis
Date: Wed Dec  1 18:00:37 2010
New Revision: 1041108

URL: http://svn.apache.org/viewvc?rev=1041108&view=rev
Log:
revert last to fix test failures

Modified:
    cassandra/branches/cassandra-0.6/CHANGES.txt
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
    cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
    
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java

Modified: cassandra/branches/cassandra-0.6/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/CHANGES.txt?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- cassandra/branches/cassandra-0.6/CHANGES.txt (original)
+++ cassandra/branches/cassandra-0.6/CHANGES.txt Wed Dec  1 18:00:37 2010
@@ -10,8 +10,6 @@
  * detect and warn when obsolete version of JNA is present (CASSANDRA-1770)
  * fix live-column-count of slice ranges including tombstoned supercolumn 
    with live subcolumn (CASSANDRA-1591)
- * avoid opening readers on anticompacted to-be-streamed temporary
-   files (CASSANDRA-1752)
 
 
 0.6.8

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/CompactionManager.java
 Wed Dec  1 18:00:37 2010
@@ -133,11 +133,11 @@ public class CompactionManager implement
         return executor.submit(runnable);
     }
 
-    public Future<List<String>> submitAnticompaction(final ColumnFamilyStore 
cfStore, final Collection<Range> ranges, final InetAddress target)
+    public Future<List<SSTableReader>> submitAnticompaction(final 
ColumnFamilyStore cfStore, final Collection<Range> ranges, final InetAddress 
target)
     {
-        Callable<List<String>> callable = new Callable<List<String>>()
+        Callable<List<SSTableReader>> callable = new 
Callable<List<SSTableReader>>()
         {
-            public List<String> call() throws IOException
+            public List<SSTableReader> call() throws IOException
             {
                 return doAntiCompaction(cfStore, cfStore.getSSTables(), 
ranges, target);
             }
@@ -320,7 +320,18 @@ public class CompactionManager implement
         return sstables.size();
     }
 
-    private SSTableWriter antiCompactionHelper(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
+    /**
+     * This function is used to do the anti compaction process , it spits out 
the file which has keys that belong to a given range
+     * If the target is not specified it spits out the file as a compacted 
file with the unecessary ranges wiped out.
+     *
+     * @param cfs
+     * @param sstables
+     * @param ranges
+     * @param target
+     * @return
+     * @throws java.io.IOException
+     */
+    private List<SSTableReader> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
             throws IOException
     {
         Table table = cfs.getTable();
@@ -337,9 +348,10 @@ public class CompactionManager implement
             // compacting for streaming: send to subdirectory
             compactionFileLocation = compactionFileLocation + File.separator + 
DatabaseDescriptor.STREAMING_SUBDIR;
         }
+        List<SSTableReader> results = new ArrayList<SSTableReader>();
 
         long startTime = System.currentTimeMillis();
-        long totalKeysWritten = 0;
+        long totalkeysWritten = 0;
 
         int expectedBloomFilterSize = 
Math.max(DatabaseDescriptor.getIndexInterval(), 
(int)(SSTableReader.getApproximateKeyCount(sstables) / 2));
         if (logger.isDebugEnabled())
@@ -352,6 +364,11 @@ public class CompactionManager implement
 
         try
         {
+            if (!nni.hasNext())
+            {
+                return results;
+            }
+
             while (nni.hasNext())
             {
                 CompactionIterator.CompactedRow row = nni.next();
@@ -362,7 +379,7 @@ public class CompactionManager implement
                     writer = new SSTableWriter(newFilename, 
expectedBloomFilterSize, StorageService.getPartitioner());
                 }
                 writer.append(row.key, row.buffer);
-                totalKeysWritten++;
+                totalkeysWritten++;
             }
         }
         finally
@@ -372,53 +389,12 @@ public class CompactionManager implement
 
         if (writer != null)
         {
+            results.add(writer.closeAndOpenReader());
             String format = "AntiCompacted to %s.  %d/%d bytes for %d keys.  
Time: %dms.";
             long dTime = System.currentTimeMillis() - startTime;
-            List<String> filenames = writer.getAllFilenames();
-            long length = new File(filenames.get(filenames.size() 
-1)).length(); // Data file is last in the list
-            logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), length, totalKeysWritten, dTime));
-        }
-        return writer;
-    }
-
-    /**
-     * This function is used to do the anti compaction process.  It spits out 
a file which has keys
-     * that belong to a given range. If the target is not specified it spits 
out the file as a compacted file with the
-     * unnecessary ranges wiped out.
-     *
-     * @param cfs
-     * @param sstables
-     * @param ranges
-     * @param target
-     * @return
-     * @throws java.io.IOException
-     */
-    private List<String> doAntiCompaction(ColumnFamilyStore cfs, 
Collection<SSTableReader> sstables, Collection<Range> ranges, InetAddress 
target)
-            throws IOException
-    {
-        List<String> filenames = new ArrayList<String>(SSTable.FILES_ON_DISK);
-        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, 
target);
-        if (writer != null)
-        {
-            writer.close();
-            filenames = writer.getAllFilenames();
+            logger.info(String.format(format, writer.getFilename(), 
SSTable.getTotalBytes(sstables), results.get(0).length(), totalkeysWritten, 
dTime));
         }
-        return filenames;
-    }
 
-    /**
-     * Like doAntiCompaction(), but returns an List of SSTableReaders instead 
of a list of filenames.
-     * @throws java.io.IOException
-     */
-    private List<SSTableReader> 
doAntiCompactionReturnReaders(ColumnFamilyStore cfs, Collection<SSTableReader> 
sstables, Collection<Range> ranges, InetAddress target)
-            throws IOException
-    {
-        List<SSTableReader> results = new ArrayList<SSTableReader>(1);
-        SSTableWriter writer = antiCompactionHelper(cfs, sstables, ranges, 
target);
-        if (writer != null)
-        {
-            results.add(writer.closeAndOpenReader());
-        }
         return results;
     }
 
@@ -431,7 +407,7 @@ public class CompactionManager implement
     private void doCleanupCompaction(ColumnFamilyStore cfs) throws IOException
     {
         Collection<SSTableReader> originalSSTables = cfs.getSSTables();
-        List<SSTableReader> sstables = doAntiCompactionReturnReaders(cfs, 
originalSSTables, StorageService.instance.getLocalRanges(cfs.getTable().name), 
null);
+        List<SSTableReader> sstables = doAntiCompaction(cfs, originalSSTables, 
StorageService.instance.getLocalRanges(cfs.getTable().name), null);
         if (!sstables.isEmpty())
         {
             cfs.replaceCompactedSSTables(originalSSTables, sstables);

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
(original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/db/Table.java 
Wed Dec  1 18:00:37 2010
@@ -282,9 +282,9 @@ public class Table 
      * do a complete compaction since we can figure out based on the ranges
      * whether the files need to be split.
     */
-    public List<String> forceAntiCompaction(Collection<Range> ranges, 
InetAddress target)
+    public List<SSTableReader> forceAntiCompaction(Collection<Range> ranges, 
InetAddress target)
     {
-        List<String> allResults = new ArrayList<String>();
+        List<SSTableReader> allResults = new ArrayList<SSTableReader>();
         Set<String> columnFamilies = tableMetadata.getColumnFamilies();
         for ( String columnFamily : columnFamilies )
         {

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/io/SSTableWriter.java
 Wed Dec  1 18:00:37 2010
@@ -114,7 +114,7 @@ public class SSTableWriter extends SSTab
     /**
      * Renames temporary SSTable files to valid data, index, and bloom filter 
files
      */
-    public void close() throws IOException
+    public SSTableReader closeAndOpenReader() throws IOException
     {
         // bloom filter
         FileOutputStream fos = new FileOutputStream(filterFilename());
@@ -136,14 +136,6 @@ public class SSTableWriter extends SSTab
         path = rename(path); // important to do this last since index & filter 
file names are derived from it
 
         indexSummary.complete();
-    }
-    
-    /**
-     * Renames temporary SSTable files to valid data, index, and bloom filter 
files and returns an SSTableReader
-     */
-    public SSTableReader closeAndOpenReader() throws IOException
-    {
-        this.close();
         return new SSTableReader(path, partitioner, indexSummary, bf);
     }
 

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/service/AntiEntropyService.java
 Wed Dec  1 18:00:37 2010
@@ -485,12 +485,12 @@ public class AntiEntropyService
             try
             {
                 List<Range> ranges = new ArrayList<Range>(differences);
-                final List<String> filenames = 
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
+                final List<SSTableReader> sstables = 
CompactionManager.instance.submitAnticompaction(cfstore, ranges, remote).get();
                 Future f = 
StageManager.getStage(StageManager.STREAM_STAGE).submit(new WrappedRunnable() 
                 {
                     protected void runMayThrow() throws Exception
                     {
-                        StreamOut.transferSSTables(remote, filenames, cf.left);
+                        StreamOut.transferSSTables(remote, sstables, cf.left);
                         StreamOutManager.remove(remote);
                     }
                 });

Modified: 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
URL: 
http://svn.apache.org/viewvc/cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java?rev=1041108&r1=1041107&r2=1041108&view=diff
==============================================================================
--- 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
 (original)
+++ 
cassandra/branches/cassandra-0.6/src/java/org/apache/cassandra/streaming/StreamOut.java
 Wed Dec  1 18:00:37 2010
@@ -112,16 +112,19 @@ public class StreamOut
      * Transfers a group of sstables from a single table to the target endpoint
      * and then marks them as ready for local deletion.
      */
-    public static void transferSSTables(InetAddress target, List<String> 
filenames, String table) throws IOException
+    public static void transferSSTables(InetAddress target, 
List<SSTableReader> sstables, String table) throws IOException
     {
-        PendingFile[] pendingFiles = new PendingFile[filenames.size()];
+        PendingFile[] pendingFiles = new PendingFile[SSTable.FILES_ON_DISK * 
sstables.size()];
         int i = 0;
-        for (String filename : filenames)
+        for (SSTableReader sstable : sstables)
         {
-            File file = new File(filename);
-            pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), 
file.length(), table);
+            for (String filename : sstable.getAllFilenames())
+            {
+                File file = new File(filename);
+                pendingFiles[i++] = new PendingFile(file.getAbsolutePath(), 
file.length(), table);
+            }
         }
-        logger.info("Stream context metadata " + 
StringUtils.join(pendingFiles, ", " + " " + filenames.size() + " sstables."));
+        logger.info("Stream context metadata " + 
StringUtils.join(pendingFiles, ", " + " " + sstables.size() + " sstables."));
         StreamOutManager.get(target).addFilesToStream(pendingFiles);
         StreamInitiateMessage biMessage = new 
StreamInitiateMessage(pendingFiles);
         Message message = 
StreamInitiateMessage.makeStreamInitiateMessage(biMessage);


Reply via email to