Author: jbellis
Date: Thu Jun 23 05:49:35 2011
New Revision: 1138740

URL: http://svn.apache.org/viewvc?rev=1138740&view=rev
Log:
clean up tmpfiles after failed compaction
patch by Aaron Morton; reviewed by slebresne and Stu Hood for CASSANDRA-2468

Modified:
    cassandra/trunk/CHANGES.txt
    cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
    cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
    
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
    
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
    cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
    cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
    cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java

Modified: cassandra/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/CHANGES.txt (original)
+++ cassandra/trunk/CHANGES.txt Thu Jun 23 05:49:35 2011
@@ -7,6 +7,7 @@
    (CASSANDRA-2062)
  * Fixed the ability to set compaction strategy in cli using create column 
family command (CASSANDRA-2778)
  * Add startup flag to renew counter node id (CASSANDRA-2788)
+ * clean up tmp files after failed compaction (CASSANDRA-2468)
 
 
 0.8.2

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/ColumnFamilyStore.java Thu 
Jun 23 05:49:35 2011
@@ -454,7 +454,14 @@ public class ColumnFamilyStore implement
 
             if (components.contains(Component.COMPACTED_MARKER) || 
desc.temporary)
             {
-                SSTable.delete(desc, components);
+                try
+                {
+                    SSTable.delete(desc, components);
+                }
+                catch (IOException e)
+                {
+                    throw new IOError(e);
+                }
                 continue;
             }
 

Modified: cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java (original)
+++ cassandra/trunk/src/java/org/apache/cassandra/db/Memtable.java Thu Jun 23 
05:49:35 2011
@@ -224,14 +224,22 @@ public class Memtable
                                       + keySize // keys in data file
                                       + currentThroughput.get()) // data
                                      * 1.2); // bloom filter and row index 
overhead
+        SSTableReader ssTable;
+        // errors when creating the writer that may leave empty temp files.
         SSTableWriter writer = cfs.createFlushWriter(columnFamilies.size(), 
estimatedSize, context);
+        try
+        {
+            // (we can't clear out the map as-we-go to free up memory,
+            //  since the memtable is being used for queries in the "pending 
flush" category)
+            for (Map.Entry<DecoratedKey, ColumnFamily> entry : 
columnFamilies.entrySet())
+                writer.append(entry.getKey(), entry.getValue());
 
-        // (we can't clear out the map as-we-go to free up memory,
-        //  since the memtable is being used for queries in the "pending 
flush" category)
-        for (Map.Entry<DecoratedKey, ColumnFamily> entry : 
columnFamilies.entrySet())
-            writer.append(entry.getKey(), entry.getValue());
-
-        SSTableReader ssTable = writer.closeAndOpenReader();
+            ssTable = writer.closeAndOpenReader();
+        }
+        finally
+        {
+            writer.cleanupIfNecessary();
+        }
         logger.info(String.format("Completed flushing %s (%d bytes)",
                                   ssTable.getFilename(), new 
File(ssTable.getFilename()).length()));
         return ssTable;

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
 Thu Jun 23 05:49:35 2011
@@ -469,131 +469,142 @@ public class CompactionManager implement
                 assert firstRowPositionFromIndex == 0 : 
firstRowPositionFromIndex;
             }
 
+            // errors when creating the writer may leave empty temp files.
             SSTableWriter writer = maybeCreateWriter(cfs, 
compactionFileLocation, expectedBloomFilterSize, null, 
Collections.singletonList(sstable));
+            SSTableReader newSstable = null;
             executor.beginCompaction(new ScrubInfo(dataFile, sstable));
             int goodRows = 0, badRows = 0, emptyRows = 0;
 
-            while (!dataFile.isEOF())
+            try
             {
-                long rowStart = dataFile.getFilePointer();
-                if (logger.isDebugEnabled())
-                    logger.debug("Reading row at " + rowStart);
-
-                DecoratedKey key = null;
-                long dataSize = -1;
-                try
+                while (!dataFile.isEOF())
                 {
-                    key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
-                    dataSize = sstable.descriptor.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
+                    long rowStart = dataFile.getFilePointer();
                     if (logger.isDebugEnabled())
-                        logger.debug(String.format("row %s is %s bytes", 
ByteBufferUtil.bytesToHex(key.key), dataSize));
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    // check for null key below
-                }
-
-                ByteBuffer currentIndexKey = nextIndexKey;
-                long nextRowPositionFromIndex;
-                try
-                {
-                    nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
-                    nextRowPositionFromIndex = indexFile.isEOF() ? 
dataFile.length() : indexFile.readLong();
-                }
-                catch (Throwable th)
-                {
-                    logger.warn("Error reading index file", th);
-                    nextIndexKey = null;
-                    nextRowPositionFromIndex = dataFile.length();
-                }
-
-                long dataStart = dataFile.getFilePointer();
-                long dataStartFromIndex = currentIndexKey == null
-                                        ? -1
-                                        : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
-                long dataSizeFromIndex = nextRowPositionFromIndex - 
dataStartFromIndex;
-                assert currentIndexKey != null || indexFile.isEOF();
-                if (logger.isDebugEnabled() && currentIndexKey != null)
-                    logger.debug(String.format("Index doublecheck: row %s is 
%s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+                        logger.debug("Reading row at " + rowStart);
 
-                writer.mark();
-                try
-                {
-                    if (key == null)
-                        throw new IOError(new IOException("Unable to read row 
key from data file"));
-                    if (dataSize > dataFile.length())
-                        throw new IOError(new IOException("Impossible row size 
" + dataSize));
-                    SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
-                    AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
-                    if (compactedRow.isEmpty())
+                    DecoratedKey key = null;
+                    long dataSize = -1;
+                    try
                     {
-                        emptyRows++;
+                        key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, ByteBufferUtil.readWithShortLength(dataFile));
+                        dataSize = sstable.descriptor.hasIntRowSize ? 
dataFile.readInt() : dataFile.readLong();
+                        if (logger.isDebugEnabled())
+                            logger.debug(String.format("row %s is %s bytes", 
ByteBufferUtil.bytesToHex(key.key), dataSize));
                     }
-                    else
+                    catch (Throwable th)
                     {
-                        writer.append(compactedRow);
-                        goodRows++;
+                        throwIfFatal(th);
+                        // check for null key below
                     }
-                    if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
-                        logger.warn("Row scrubbed successfully but index file 
contains a different key or row size; consider rebuilding the index as 
described in 
http://www.mail-archive.com/[email protected]/msg03325.html";);
-                }
-                catch (Throwable th)
-                {
-                    throwIfFatal(th);
-                    logger.warn("Non-fatal error reading row (stacktrace 
follows)", th);
-                    writer.reset();
 
-                    if (currentIndexKey != null
-                        && (key == null || !key.key.equals(currentIndexKey) || 
dataStart != dataStartFromIndex || dataSize != dataSizeFromIndex))
+                    ByteBuffer currentIndexKey = nextIndexKey;
+                    long nextRowPositionFromIndex;
+                    try
+                    {
+                        nextIndexKey = indexFile.isEOF() ? null : 
ByteBufferUtil.readWithShortLength(indexFile);
+                        nextRowPositionFromIndex = indexFile.isEOF() ? 
dataFile.length() : indexFile.readLong();
+                    }
+                    catch (Throwable th)
                     {
-                        logger.info(String.format("Retrying from row index; 
data is %s bytes starting at %s",
-                                                  dataSizeFromIndex, 
dataStartFromIndex));
-                        key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, currentIndexKey);
-                        try
+                        logger.warn("Error reading index file", th);
+                        nextIndexKey = null;
+                        nextRowPositionFromIndex = dataFile.length();
+                    }
+
+                    long dataStart = dataFile.getFilePointer();
+                    long dataStartFromIndex = currentIndexKey == null
+                                            ? -1
+                                            : rowStart + 2 + 
currentIndexKey.remaining() + (sstable.descriptor.hasIntRowSize ? 4 : 8);
+                    long dataSizeFromIndex = nextRowPositionFromIndex - 
dataStartFromIndex;
+                    assert currentIndexKey != null || indexFile.isEOF();
+                    if (logger.isDebugEnabled() && currentIndexKey != null)
+                        logger.debug(String.format("Index doublecheck: row %s 
is %s bytes", ByteBufferUtil.bytesToHex(currentIndexKey),  dataSizeFromIndex));
+
+                    writer.mark();
+                    try
+                    {
+                        if (key == null)
+                            throw new IOError(new IOException("Unable to read 
row key from data file"));
+                        if (dataSize > dataFile.length())
+                            throw new IOError(new IOException("Impossible row 
size " + dataSize));
+                        SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStart, dataSize, true);
+                        AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                        if (compactedRow.isEmpty())
+                        {
+                            emptyRows++;
+                        }
+                        else
+                        {
+                            writer.append(compactedRow);
+                            goodRows++;
+                        }
+                        if (!key.key.equals(currentIndexKey) || dataStart != 
dataStartFromIndex)
+                            logger.warn("Row scrubbed successfully but index 
file contains a different key or row size; consider rebuilding the index as 
described in 
http://www.mail-archive.com/[email protected]/msg03325.html";);
+                    }
+                    catch (Throwable th)
+                    {
+                        throwIfFatal(th);
+                        logger.warn("Non-fatal error reading row (stacktrace 
follows)", th);
+                        writer.reset();
+
+                        if (currentIndexKey != null
+                            && (key == null || 
!key.key.equals(currentIndexKey) || dataStart != dataStartFromIndex || dataSize 
!= dataSizeFromIndex))
                         {
-                            SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
-                            AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
-                            if (compactedRow.isEmpty())
+                            logger.info(String.format("Retrying from row 
index; data is %s bytes starting at %s",
+                                                      dataSizeFromIndex, 
dataStartFromIndex));
+                            key = SSTableReader.decodeKey(sstable.partitioner, 
sstable.descriptor, currentIndexKey);
+                            try
                             {
-                                emptyRows++;
+                                SSTableIdentityIterator row = new 
SSTableIdentityIterator(sstable, dataFile, key, dataStartFromIndex, 
dataSizeFromIndex, true);
+                                AbstractCompactedRow compactedRow = 
controller.getCompactedRow(row);
+                                if (compactedRow.isEmpty())
+                                {
+                                    emptyRows++;
+                                }
+                                else
+                                {
+                                    writer.append(compactedRow);
+                                    goodRows++;
+                                }
                             }
-                            else
+                            catch (Throwable th2)
                             {
-                                writer.append(compactedRow);
-                                goodRows++;
+                                throwIfFatal(th2);
+                                // Skipping rows is dangerous for counters 
(see CASSANDRA-2759)
+                                if (isCommutative)
+                                    throw new IOError(th2);
+
+                                logger.warn("Retry failed too.  Skipping to 
next row (retry's stacktrace follows)", th2);
+                                writer.reset();
+                                dataFile.seek(nextRowPositionFromIndex);
+                                badRows++;
                             }
                         }
-                        catch (Throwable th2)
+                        else
                         {
-                            throwIfFatal(th2);
                             // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
                             if (isCommutative)
-                                throw new IOError(th2);
+                                throw new IOError(th);
 
-                            logger.warn("Retry failed too.  Skipping to next 
row (retry's stacktrace follows)", th2);
-                            writer.reset();
-                            dataFile.seek(nextRowPositionFromIndex);
+                            logger.warn("Row at " + dataStart + " is 
unreadable; skipping to next");
+                            if (currentIndexKey != null)
+                                dataFile.seek(nextRowPositionFromIndex);
                             badRows++;
                         }
                     }
-                    else
-                    {
-                        // Skipping rows is dangerous for counters (see 
CASSANDRA-2759)
-                        if (isCommutative)
-                            throw new IOError(th);
-
-                        logger.warn("Row at " + dataStart + " is unreadable; 
skipping to next");
-                        if (currentIndexKey != null)
-                            dataFile.seek(nextRowPositionFromIndex);
-                        badRows++;
-                    }
                 }
+
+                if (writer.getFilePointer() > 0)
+                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
+            }
+            finally
+            {
+                writer.cleanupIfNecessary();
             }
 
-            if (writer.getFilePointer() > 0)
+            if (newSstable != null)
             {
-                SSTableReader newSstable = 
writer.closeAndOpenReader(sstable.maxDataAge);
                 cfs.replaceCompactedSSTables(Arrays.asList(sstable), 
Arrays.asList(newSstable));
                 logger.info("Scrub of " + sstable + " complete: " + goodRows + 
" rows in new sstable and " + emptyRows + " empty (tombstoned) rows dropped");
                 if (badRows > 0)
@@ -652,6 +663,7 @@ public class CompactionManager implement
               logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
 
             SSTableWriter writer = null;
+            SSTableReader newSstable = null;
 
             logger.info("Cleaning up " + sstable);
             // Calculate the expected compacted filesize
@@ -691,17 +703,21 @@ public class CompactionManager implement
                         }
                     }
                 }
+                if (writer != null)
+                    newSstable = writer.closeAndOpenReader(sstable.maxDataAge);
             }
             finally
             {
                 scanner.close();
                 executor.finishCompaction(ci);
+                if (writer != null)
+                    writer.cleanupIfNecessary();
+                executor.finishCompaction(ci);
             }
 
             List<SSTableReader> results = new ArrayList<SSTableReader>();
-            if (writer != null)
+            if (newSstable != null)
             {
-                SSTableReader newSstable = 
writer.closeAndOpenReader(sstable.maxDataAge);
                 results.add(newSstable);
 
                 String format = "Cleaned up to %s.  %,d to %,d (~%d%% of 
original) bytes for %,d keys.  Time: %,dms.";

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
(original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
Thu Jun 23 05:49:35 2011
@@ -127,7 +127,8 @@ public class CompactionTask extends Abst
         if (logger.isDebugEnabled())
             logger.debug("Expected bloom filter size : " + 
expectedBloomFilterSize);
 
-        SSTableWriter writer;
+        SSTableWriter writer = null;
+        final SSTableReader ssTable;
         CompactionIterator ci = new CompactionIterator(type, toCompact, 
controller); // retain a handle so we can call close()
         Iterator<AbstractCompactedRow> nni = Iterators.filter(ci, 
Predicates.notNull());
         Map<DecoratedKey, Long> cachedKeys = new HashMap<DecoratedKey, Long>();
@@ -164,15 +165,17 @@ public class CompactionTask extends Abst
                     }
                 }
             }
+            ssTable = writer.closeAndOpenReader(getMaxDataAge(toCompact));
         }
         finally
         {
             ci.close();
             if (collector != null)
                 collector.finishCompaction(ci);
+            if (writer != null)
+                writer.cleanupIfNecessary();
         }
 
-        SSTableReader ssTable = 
writer.closeAndOpenReader(getMaxDataAge(toCompact));
         cfs.replaceCompactedSSTables(toCompact, Arrays.asList(ssTable));
         for (Entry<DecoratedKey, Long> entry : cachedKeys.entrySet()) // empty 
if preheat is off
             ssTable.cacheKey(entry.getKey(), entry.getValue());

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/Descriptor.java 
Thu Jun 23 05:49:35 2011
@@ -55,6 +55,21 @@ public class Descriptor
     public final boolean isLatestVersion;
     public final boolean usesOldBloomFilter;
 
+    public enum TempState
+    {
+        LIVE,
+        TEMP,
+        ANY;
+
+        boolean isMatch(Descriptor descriptor)
+        {
+            assert descriptor != null;
+            if (TempState.ANY == this)
+                return true;
+            return (TempState.TEMP == this) ? descriptor.temporary : 
!descriptor.temporary;
+        }
+    }
+
     /**
      * A descriptor that assumes CURRENT_VERSION.
      */

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTable.java Thu 
Jun 23 05:49:35 2011
@@ -21,7 +21,6 @@ package org.apache.cassandra.io.sstable;
 
 import java.io.File;
 import java.io.FilenameFilter;
-import java.io.IOError;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
@@ -137,26 +136,20 @@ public abstract class SSTable
      *
      * @return true if the file was deleted
      */
-    public static boolean delete(Descriptor desc, Set<Component> components)
+    public static boolean delete(Descriptor desc, Set<Component> components) 
throws IOException
     {
-        try
-        {
-            // remove the DATA component first if it exists
-            if (components.contains(Component.DATA))
-                FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
-            for (Component component : components)
-            {
-                if (component.equals(Component.DATA) || 
component.equals(Component.COMPACTED_MARKER))
-                    continue;
-                FileUtils.deleteWithConfirm(desc.filenameFor(component));
-            }
-            // remove the COMPACTED_MARKER component last if it exists
-            FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
-        }
-        catch (IOException e)
+        // remove the DATA component first if it exists
+        if (components.contains(Component.DATA))
+            FileUtils.deleteWithConfirm(desc.filenameFor(Component.DATA));
+        for (Component component : components)
         {
-            throw new IOError(e);
+            if (component.equals(Component.DATA) || 
component.equals(Component.COMPACTED_MARKER))
+                continue;
+            FileUtils.deleteWithConfirm(desc.filenameFor(component));
         }
+        // remove the COMPACTED_MARKER component last if it exists
+        FileUtils.delete(desc.filenameFor(Component.COMPACTED_MARKER));
+
         logger.debug("Deleted {}", desc);
         return true;
     }
@@ -196,7 +189,7 @@ public abstract class SSTable
     /**
      * Discovers existing components for the descriptor. Slow: only intended 
for use outside the critical path.
      */
-    static Set<Component> componentsFor(final Descriptor desc, final boolean 
liveOnly)
+    static Set<Component> componentsFor(final Descriptor desc, final 
Descriptor.TempState matchState)
     {
         final Set<Component> components = new HashSet<Component>();
         desc.directory.list(new FilenameFilter()
@@ -204,7 +197,7 @@ public abstract class SSTable
             public boolean accept(File dir, String name)
             {
                 Pair<Descriptor,Component> component = 
tryComponentFromFilename(dir, name);
-                if (component != null && component.left.equals(desc) && 
(!liveOnly || !component.left.temporary))
+                if (component != null && component.left.equals(desc) && 
(matchState.isMatch(component.left)))
                     components.add(component.right);
                 return false;
             }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
 (original)
+++ 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableDeletingReference.java
 Thu Jun 23 05:49:35 2011
@@ -20,6 +20,7 @@
 package org.apache.cassandra.io.sstable;
 
 import java.io.File;
+import java.io.IOError;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
 import java.lang.ref.ReferenceQueue;
@@ -94,7 +95,15 @@ public class SSTableDeletingReference ex
                 }
             }
             // let the remainder be cleaned up by delete
-            SSTable.delete(desc, Sets.difference(components, 
Collections.singleton(Component.DATA)));
+            try
+            {
+                SSTable.delete(desc, Sets.difference(components, 
Collections.singleton(Component.DATA)));
+            }
+            catch (IOException e)
+            {
+                throw new IOError(e);
+            }
+
             tracker.spaceReclaimed(size);
         }
     }

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
Thu Jun 23 05:49:35 2011
@@ -138,7 +138,7 @@ public class SSTableReader extends SSTab
 
     public static SSTableReader open(Descriptor desc) throws IOException
     {
-        Set<Component> components = componentsFor(desc, false);
+        Set<Component> components = componentsFor(desc, 
Descriptor.TempState.ANY);
         return open(desc, components, 
DatabaseDescriptor.getCFMetaData(desc.ksname, desc.cfname), 
StorageService.getPartitioner());
     }
 

Modified: 
cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java 
Thu Jun 23 05:49:35 2011
@@ -168,6 +168,26 @@ public class SSTableWriter extends SSTab
         afterAppend(decoratedKey, currentPosition);
     }
 
+    /**
+     * Attempt to close the index writer and data file before deleting all 
temp components for the sstable
+     */
+    public void cleanupIfNecessary()
+    {
+        FileUtils.closeQuietly(iwriter);
+        FileUtils.closeQuietly(dataFile);
+
+        try
+        {
+            Set<Component> components = SSTable.componentsFor(descriptor, 
Descriptor.TempState.TEMP);
+            if (!components.isEmpty())
+                SSTable.delete(descriptor, components);
+        }
+        catch (Exception e)
+        {
+            logger.error(String.format("Failed deleting temp components for 
%s", descriptor), e);
+        }
+    }
+
     public SSTableReader closeAndOpenReader() throws IOException
     {
         return closeAndOpenReader(System.currentTimeMillis());
@@ -300,26 +320,53 @@ public class SSTableWriter extends SSTab
 
         public SSTableReader build() throws IOException
         {
-            if (cfs.isInvalid())
-                return null;
-            maybeOpenIndexer();
+            try
+            {
+                if (cfs.isInvalid())
+                    return null;
+                maybeOpenIndexer();
+
+                File ifile = new 
File(desc.filenameFor(SSTable.COMPONENT_INDEX));
+                File ffile = new 
File(desc.filenameFor(SSTable.COMPONENT_FILTER));
+                assert !ifile.exists();
+                assert !ffile.exists();
 
-            File ifile = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
-            File ffile = new File(desc.filenameFor(SSTable.COMPONENT_FILTER));
-            assert !ifile.exists();
-            assert !ffile.exists();
+                long estimatedRows = indexer.prepareIndexing();
 
-            long estimatedRows = indexer.prepareIndexing();
+                // build the index and filter
+                long rows = indexer.index();
+
+                logger.debug("estimated row count was {} of real count", 
((double)estimatedRows) / rows);
+                return SSTableReader.open(rename(desc, 
SSTable.componentsFor(desc, Descriptor.TempState.ANY)));
+            }
+            finally
+            {
+                cleanupIfNecessary();
+            }
+        }
 
-            // build the index and filter
-            long rows = indexer.index();
+        /**
+        * Attempt to close the index writer before deleting all temp 
components for the sstable
+        */
+        public void cleanupIfNecessary()
+        {
+            FileUtils.closeQuietly(indexer);
 
-            logger.debug("estimated row count was {} of real count", 
((double)estimatedRows) / rows);
-            return SSTableReader.open(rename(desc, SSTable.componentsFor(desc, 
false)));
+            try
+            {
+                Set<Component> components = SSTable.componentsFor(desc, 
Descriptor.TempState.TEMP);
+                if (!components.isEmpty())
+                    SSTable.delete(desc, components);
+            }
+            catch (Exception e)
+            {
+                logger.error(String.format("Failed deleting temp components 
for %s", desc), e);
+            }
         }
+
     }
 
-    static class RowIndexer
+    static class RowIndexer implements Closeable
     {
         protected final Descriptor desc;
         public final BufferedRandomAccessFile dfile;
@@ -376,7 +423,7 @@ public class SSTableWriter extends SSTab
             }
         }
 
-        void close() throws IOException
+        public void close() throws IOException
         {
             dfile.close();
             iwriter.close();
@@ -465,6 +512,11 @@ public class SSTableWriter extends SSTab
             writeMetadata(desc, rowSizes, columnCounts, ReplayPosition.NONE);
             return rows;
         }
+
+        public String toString()
+        {
+            return "RowIndexer(" + desc + ")";
+        }
     }
 
     /*
@@ -533,7 +585,7 @@ public class SSTableWriter extends SSTab
         }
 
         @Override
-        void close() throws IOException
+        public void close() throws IOException
         {
             super.close();
             writerDfile.close();
@@ -543,7 +595,7 @@ public class SSTableWriter extends SSTab
     /**
      * Encapsulates writing the index and filter for an SSTable. The state of 
this object is not valid until it has been closed.
      */
-    static class IndexWriter
+    static class IndexWriter implements Closeable
     {
         private final BufferedRandomAccessFile indexFile;
         public final Descriptor desc;
@@ -610,5 +662,10 @@ public class SSTableWriter extends SSTab
             // we assume that if that worked then we won't be trying to reset.
             indexFile.reset(mark);
         }
+
+        public String toString()
+        {
+            return "IndexWriter(" + desc + ")";
+        }
     }
 }

Modified: cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java 
(original)
+++ cassandra/trunk/src/java/org/apache/cassandra/io/util/FileUtils.java Thu 
Jun 23 05:49:35 2011
@@ -92,7 +92,7 @@ public class FileUtils
         }
         catch (Exception e)
         {
-            logger_.warn("Failed closing stream", e);
+            logger_.warn("Failed closing " + c, e);
         }
     }
 

Modified: 
cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
URL: 
http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java?rev=1138740&r1=1138739&r2=1138740&view=diff
==============================================================================
--- cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java 
(original)
+++ cassandra/trunk/test/unit/org/apache/cassandra/io/sstable/SSTableTest.java 
Thu Jun 23 05:49:35 2011
@@ -75,9 +75,9 @@ public class SSTableTest extends Cleanup
         ssTable = SSTableReader.open(ssTable.descriptor); // read the index 
from disk
         verifyMany(ssTable, map);
 
-        Set<Component> live = SSTable.componentsFor(ssTable.descriptor, true);
+        Set<Component> live = SSTable.componentsFor(ssTable.descriptor, 
Descriptor.TempState.LIVE);
         assert !live.isEmpty() : "SSTable has live components";
-        Set<Component> all = SSTable.componentsFor(ssTable.descriptor, false);
+        Set<Component> all = SSTable.componentsFor(ssTable.descriptor, 
Descriptor.TempState.ANY);
         assert live.equals(all) : "live components same as all components";
         all.removeAll(live);
         assert all.isEmpty() : "SSTable has no temp components";


Reply via email to