Remove repair snapshot leftover on startup

patch by Paulo Motta; reviewed by yukim for CASSANDRA-7357


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/b70f7ea0
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/b70f7ea0
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/b70f7ea0

Branch: refs/heads/trunk
Commit: b70f7ea0ce27b5defa0a7773d448732364e7aee0
Parents: e726cf6
Author: Paulo Motta <pauloricard...@gmail.com>
Authored: Tue Jul 21 22:11:37 2015 -0500
Committer: Yuki Morishita <yu...@apache.org>
Committed: Tue Jul 21 22:11:37 2015 -0500

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/db/ColumnFamilyStore.java  | 54 +++++++++++++++--
 .../org/apache/cassandra/db/Directories.java    | 63 +++++++++++++++-----
 .../repair/RepairMessageVerbHandler.java        |  2 +-
 .../cassandra/service/CassandraDaemon.java      |  1 -
 .../cassandra/db/ColumnFamilyStoreTest.java     | 42 +++++++++++++
 6 files changed, 139 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index c6774c2..26ee348 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -4,6 +4,7 @@
  * Fix clientutil jar and tests (CASSANDRA-9760)
  * (cqlsh) Allow the SSL protocol version to be specified through the
    config file or environment variables (CASSANDRA-9544)
+ * Remove repair snapshot leftover on startup (CASSANDRA-7357)
 Merged from 2.0:
  * Complete CASSANDRA-8448 fix (CASSANDRA-9519)
  * Don't include auth credentials in debug log (CASSANDRA-9682)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index aec5f35..20e74dc 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.db;
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -35,7 +36,6 @@ import com.google.common.collect.*;
 import com.google.common.util.concurrent.*;
 
 import org.apache.cassandra.io.FSWriteError;
-import org.apache.cassandra.utils.memory.MemtablePool;
 import org.json.simple.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -501,6 +501,9 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
     {
         Directories directories = new Directories(metadata);
 
+        // clear ephemeral snapshots that were not properly cleared last 
session (CASSANDRA-7357)
+        clearEphemeralSnapshots(directories);
+
         // remove any left-behind SSTables from failed/stalled streaming
         FileFilter filter = new FileFilter()
         {
@@ -2249,10 +2252,13 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
 
     public void snapshotWithoutFlush(String snapshotName)
     {
-        snapshotWithoutFlush(snapshotName, null);
+        snapshotWithoutFlush(snapshotName, null, false);
     }
 
-    public void snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate)
+    /**
+     * @param ephemeral If this flag is set to true, the snapshot will be 
cleaned during next startup
+     */
+    public void snapshotWithoutFlush(String snapshotName, 
Predicate<SSTableReader> predicate, boolean ephemeral)
     {
         for (ColumnFamilyStore cfs : concatWithIndexes())
         {
@@ -2267,6 +2273,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                     File snapshotDirectory = 
Directories.getSnapshotDirectory(ssTable.descriptor, snapshotName);
                     ssTable.createLinks(snapshotDirectory.getPath()); // hard 
links
                     
filesJSONArr.add(ssTable.descriptor.relativeFilenameFor(Component.DATA));
+
                     if (logger.isDebugEnabled())
                         logger.debug("Snapshot for {} keyspace data file {} 
created in {}", keyspace, ssTable.getFilename(), snapshotDirectory);
                 }
@@ -2274,6 +2281,8 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 writeSnapshotManifest(filesJSONArr, snapshotName);
             }
         }
+        if (ephemeral)
+            createEphemeralSnapshotMarkerFile(snapshotName);
     }
 
     private void writeSnapshotManifest(final JSONArray filesJSONArr, final 
String snapshotName)
@@ -2296,6 +2305,36 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         }
     }
 
+    private void createEphemeralSnapshotMarkerFile(final String snapshot)
+    {
+        final File ephemeralSnapshotMarker = 
directories.getNewEphemeralSnapshotMarkerFile(snapshot);
+
+        try
+        {
+            if (!ephemeralSnapshotMarker.getParentFile().exists())
+                ephemeralSnapshotMarker.getParentFile().mkdirs();
+
+            Files.createFile(ephemeralSnapshotMarker.toPath());
+            logger.debug("Created ephemeral snapshot marker file on {}.", 
ephemeralSnapshotMarker.getAbsolutePath());
+        }
+        catch (IOException e)
+        {
+            logger.warn(String.format("Could not create marker file %s for 
ephemeral snapshot %s. " +
+                                      "In case there is a failure in the 
operation that created " +
+                                      "this snapshot, you may need to clean it 
manually afterwards.",
+                                      
ephemeralSnapshotMarker.getAbsolutePath(), snapshot), e);
+        }
+    }
+
+    protected static void clearEphemeralSnapshots(Directories directories)
+    {
+        for (String ephemeralSnapshot : directories.listEphemeralSnapshots())
+        {
+            logger.debug("Clearing ephemeral snapshot {} leftover from 
previous session.", ephemeralSnapshot);
+            Directories.clearSnapshot(ephemeralSnapshot, 
directories.getCFDirectories());
+        }
+    }
+
     public Refs<SSTableReader> getSnapshotSSTableReader(String tag) throws 
IOException
     {
         Map<Integer, SSTableReader> active = new HashMap<>();
@@ -2341,13 +2380,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
      */
     public void snapshot(String snapshotName)
     {
-        snapshot(snapshotName, null);
+        snapshot(snapshotName, null, false);
     }
 
-    public void snapshot(String snapshotName, Predicate<SSTableReader> 
predicate)
+    /**
+     * @param ephemeral If this flag is set to true, the snapshot will be 
cleaned up during next startup
+     */
+    public void snapshot(String snapshotName, Predicate<SSTableReader> 
predicate, boolean ephemeral)
     {
         forceBlockingFlush();
-        snapshotWithoutFlush(snapshotName, predicate);
+        snapshotWithoutFlush(snapshotName, predicate, ephemeral);
     }
 
     public boolean snapshotExists(String snapshotName)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/db/Directories.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Directories.java 
b/src/java/org/apache/cassandra/db/Directories.java
index 2e0b60c..810c336 100644
--- a/src/java/org/apache/cassandra/db/Directories.java
+++ b/src/java/org/apache/cassandra/db/Directories.java
@@ -376,6 +376,17 @@ public class Directories
          return new File(getDirectoryForNewSSTables(), join(SNAPSHOT_SUBDIR, 
snapshotName, "manifest.json"));
     }
 
+    public File getNewEphemeralSnapshotMarkerFile(String snapshotName)
+    {
+        File snapshotDir = new File(getWriteableLocationAsFile(1L), 
join(SNAPSHOT_SUBDIR, snapshotName));
+        return getEphemeralSnapshotMarkerFile(snapshotDir);
+    }
+
+    private static File getEphemeralSnapshotMarkerFile(File snapshotDirectory)
+    {
+        return new File(snapshotDirectory, "ephemeral.snapshot");
+    }
+
     public static File getBackupsDirectory(Descriptor desc)
     {
         return getOrCreate(desc.directory, BACKUPS_SUBDIR);
@@ -563,34 +574,55 @@ public class Directories
     public Map<String, Pair<Long, Long>> getSnapshotDetails()
     {
         final Map<String, Pair<Long, Long>> snapshotSpaceMap = new HashMap<>();
+        for (File snapshot : listSnapshots())
+        {
+            final long sizeOnDisk = FileUtils.folderSize(snapshot);
+            final long trueSize = getTrueAllocatedSizeIn(snapshot);
+            Pair<Long, Long> spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
+            if (spaceUsed == null)
+                spaceUsed =  Pair.create(sizeOnDisk,trueSize);
+            else
+                spaceUsed = Pair.create(spaceUsed.left + sizeOnDisk, 
spaceUsed.right + trueSize);
+            snapshotSpaceMap.put(snapshot.getName(), spaceUsed);
+        }
+        return snapshotSpaceMap;
+    }
+
+
+    public List<String> listEphemeralSnapshots()
+    {
+        final List<String> ephemeralSnapshots = new LinkedList<>();
+        for (File snapshot : listSnapshots())
+        {
+            if (getEphemeralSnapshotMarkerFile(snapshot).exists())
+                ephemeralSnapshots.add(snapshot.getName());
+        }
+        return ephemeralSnapshots;
+    }
+
+    private List<File> listSnapshots()
+    {
+        final List<File> snapshots = new LinkedList<>();
         for (final File dir : dataPaths)
         {
             final File snapshotDir = new File(dir,SNAPSHOT_SUBDIR);
             if (snapshotDir.exists() && snapshotDir.isDirectory())
             {
-                final File[] snapshots  = snapshotDir.listFiles();
-                if (snapshots != null)
+                final File[] snapshotDirs  = snapshotDir.listFiles();
+                if (snapshotDirs != null)
                 {
-                    for (final File snapshot : snapshots)
+                    for (final File snapshot : snapshotDirs)
                     {
                         if (snapshot.isDirectory())
-                        {
-                            final long sizeOnDisk = 
FileUtils.folderSize(snapshot);
-                            final long trueSize = 
getTrueAllocatedSizeIn(snapshot);
-                            Pair<Long,Long> spaceUsed = 
snapshotSpaceMap.get(snapshot.getName());
-                            if (spaceUsed == null)
-                                spaceUsed =  Pair.create(sizeOnDisk,trueSize);
-                            else
-                                spaceUsed = Pair.create(spaceUsed.left + 
sizeOnDisk, spaceUsed.right + trueSize);
-                            snapshotSpaceMap.put(snapshot.getName(), 
spaceUsed);
-                        }
+                            snapshots.add(snapshot);
                     }
                 }
             }
         }
 
-        return snapshotSpaceMap;
+        return snapshots;
     }
+
     public boolean snapshotExists(String snapshotName)
     {
         for (File dir : dataPaths)
@@ -611,8 +643,7 @@ public class Directories
             File snapshotDir = new File(dir, join(SNAPSHOT_SUBDIR, tag));
             if (snapshotDir.exists())
             {
-                if (logger.isDebugEnabled())
-                    logger.debug("Removing snapshot directory {}", 
snapshotDir);
+                logger.debug("Removing snapshot directory {}", snapshotDir);
                 FileUtils.deleteRecursive(snapshotDir);
             }
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java 
b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 872978e..fd4ac28 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -87,7 +87,7 @@ public class RepairMessageVerbHandler implements 
IVerbHandler<RepairMessage>
                                     !(sstable.partitioner instanceof 
LocalPartitioner) && // exclude SSTables from 2i
                                     new Bounds<>(sstable.first.getToken(), 
sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
                         }
-                    });
+                    }, true); //ephemeral snapshot, if repair fails, it will 
be cleaned next startup
 
                     logger.debug("Enqueuing response to snapshot request {} to 
{}", desc.sessionId, message.from);
                     MessagingService.instance().sendReply(new 
MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 949ea4c..2c141a6 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -49,7 +49,6 @@ import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.config.Schema;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.commitlog.CommitLog;
-import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.io.FSError;
 import org.apache.cassandra.io.sstable.CorruptSSTableException;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/b70f7ea0/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java 
b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
index 5faab78..35814f0 100644
--- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
@@ -1441,6 +1441,48 @@ public class ColumnFamilyStoreTest extends SchemaLoader
         findRowGetSlicesAndAssertColsFound(cfs, multiRangeReverseWithCounting, 
"a", "colI", "colD", "colC");
     }
 
+    @Test
+    public void testClearEphemeralSnapshots() throws Throwable
+    {
+        Mutation rm;
+        ColumnFamilyStore cfs = 
Keyspace.open("Keyspace3").getColumnFamilyStore("Indexed1");
+        for (int i = 0; i < 100; i++)
+        {
+            rm = new Mutation("Keyspace3", ByteBufferUtil.bytes("key" + i));
+            rm.add("Indexed1", cellname("birthdate"), 
ByteBufferUtil.bytes(34L), 0);
+            rm.add("Indexed1", cellname("notbirthdate"), 
ByteBufferUtil.bytes((long) (i % 2)), 0);
+            rm.applyUnsafe();
+        }
+
+        //cleanup any previous test gargbage
+        cfs.clearSnapshot("");
+
+        Cell[] cols = new Cell[5];
+        for (int i = 0; i < 5; i++)
+            cols[i] = column("c" + i, "value", 1);
+
+        putColsStandard(cfs, Util.dk("a"), cols[0], cols[1], cols[2], cols[3], 
cols[4]);
+        putColsStandard(cfs, Util.dk("b"), cols[0], cols[1]);
+        putColsStandard(cfs, Util.dk("c"), cols[0], cols[1], cols[2], cols[3]);
+
+        cfs.snapshot("nonEphemeralSnapshot", null, false);
+        cfs.snapshot("ephemeralSnapshot", null, true);
+
+        Map<String, Pair<Long, Long>> snapshotDetails = 
cfs.getSnapshotDetails();
+        assertEquals(2, snapshotDetails.size());
+        assertTrue(snapshotDetails.containsKey("ephemeralSnapshot"));
+        assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
+
+        ColumnFamilyStore.clearEphemeralSnapshots(cfs.directories);
+
+        snapshotDetails = cfs.getSnapshotDetails();
+        assertEquals(1, snapshotDetails.size());
+        assertTrue(snapshotDetails.containsKey("nonEphemeralSnapshot"));
+
+        //test cleanup
+        cfs.clearSnapshot("");
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testMultiRangeSomeEmptyIndexed() throws Throwable

Reply via email to