Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 d2309227c -> 5160c916c


Make sure unfinished compaction files are removed.

Patch by marcuse; reviewed by yukim for CASSANDRA-8124


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/9c316e78
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/9c316e78
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/9c316e78

Branch: refs/heads/cassandra-2.1
Commit: 9c316e7858f6dbf9df892aff78431044aa104ed9
Parents: d230922
Author: Marcus Eriksson <marc...@apache.org>
Authored: Fri Oct 17 14:15:46 2014 +0200
Committer: Marcus Eriksson <marc...@apache.org>
Committed: Mon Nov 3 16:17:01 2014 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/io/sstable/SSTableReader.java     |   6 +
 .../cassandra/io/sstable/SSTableRewriter.java   |  90 +++-
 .../io/sstable/SSTableRewriterTest.java         | 473 +++++++++++++++++++
 4 files changed, 555 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 494fb93..681d616 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.2
+ * Make sure unfinished compaction files are removed (CASSANDRA-8124)
  * Fix shutdown when run as Windows service (CASSANDRA-8136)
  * Fix DESCRIBE TABLE with custom indexes (CASSANDRA-8031)
  * Fix race in RecoveryManagerTest (CASSANDRA-8176)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
index 872f7df..40e708d 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableReader.java
@@ -1599,6 +1599,12 @@ public class SSTableReader extends SSTable
         }
     }
 
+    @VisibleForTesting
+    int referenceCount()
+    {
+        return references.get();
+    }
+
     /**
      * Release reference to this SSTableReader.
      * If there is no one referring to this SSTable, and is marked as 
compacted,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java 
b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
index 76677ac..2c9fe7e 100644
--- a/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
+++ b/src/java/org/apache/cassandra/io/sstable/SSTableRewriter.java
@@ -25,8 +25,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Functions;
+import com.google.common.collect.Lists;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
@@ -37,6 +39,7 @@ import 
org.apache.cassandra.db.compaction.AbstractCompactedRow;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.utils.CLibrary;
 import org.apache.cassandra.utils.FBUtilities;
+import org.apache.cassandra.utils.Pair;
 
 /**
  * Wraps one or more writers as output for rewriting one or more readers: 
every sstable_preemptive_open_interval_in_mb
@@ -55,8 +58,7 @@ import org.apache.cassandra.utils.FBUtilities;
  */
 public class SSTableRewriter
 {
-
-    private static final long preemptiveOpenInterval;
+    private static long preemptiveOpenInterval;
     static
     {
         long interval = 
DatabaseDescriptor.getSSTablePreempiveOpenIntervalInMB() * (1L << 20);
@@ -65,6 +67,14 @@ public class SSTableRewriter
         preemptiveOpenInterval = interval;
     }
 
+    private boolean isFinished = false;
+
+    @VisibleForTesting
+    static void overrideOpenInterval(long size)
+    {
+        preemptiveOpenInterval = size;
+    }
+
     private final DataTracker dataTracker;
     private final ColumnFamilyStore cfs;
 
@@ -77,6 +87,8 @@ public class SSTableRewriter
     private long currentlyOpenedEarlyAt; // the position (in MB) in the target 
file we last (re)opened at
 
     private final List<SSTableReader> finished = new ArrayList<>(); // the 
resultant sstables
+    private final List<SSTableReader> finishedOpenedEarly = new ArrayList<>(); 
// the 'finished' tmplink sstables
+    private final List<Pair<SSTableWriter, SSTableReader>> finishedWriters = 
new ArrayList<>();
     private final OperationType rewriteType; // the type of rewrite/compaction 
being performed
     private final boolean isOffline; // true for operations that are performed 
without Cassandra running (prevents updates of DataTracker)
 
@@ -187,20 +199,32 @@ public class SSTableRewriter
     {
         if (writer == null)
             return;
+
+        switchWriter(null);
+
         moveStarts(null, Functions.forMap(originalStarts), true);
-        List<SSTableReader> close = new ArrayList<>(finished);
+
+        List<SSTableReader> close = Lists.newArrayList(finishedOpenedEarly);
         if (currentlyOpenedEarly != null)
             close.add(currentlyOpenedEarly);
+
+        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+        {
+        // we should close the bloom filter if we have not opened an sstable 
reader from this
+        // writer (it will get closed when we release the sstable reference 
below):
+            w.left.abort(w.right == null);
+        }
+
         // also remove already completed SSTables
         for (SSTableReader sstable : close)
             sstable.markObsolete();
+
         // releases reference in replaceReaders
         if (!isOffline)
         {
             dataTracker.replaceReaders(close, 
Collections.<SSTableReader>emptyList(), false);
             dataTracker.unmarkCompacting(close);
         }
-        writer.abort(currentlyOpenedEarly == null);
     }
 
     /**
@@ -208,6 +232,11 @@ public class SSTableRewriter
      * needed, and transferring any key cache entries over to the new reader, 
expiring them from the old. if reset
      * is true, we are instead restoring the starts of the readers from before 
the rewriting began
      *
+     * note that we replace an existing sstable with a new *instance* of the 
same sstable, the replacement
+     * sstable .equals() the old one, BUT, it is a new instance, so, for 
example, since we releaseReference() on the old
+     * one, the old *instance* will have reference count == 0 and if we were 
to start a new compaction with that old
+     * instance, we would get exceptions.
+     *
      * @param newReader the rewritten reader that replaces them for this region
      * @param newStarts a function mapping a reader's descriptor to their new 
start value
      * @param reset true iff we are restoring earlier starts (increasing the 
range over which they are valid)
@@ -284,11 +313,15 @@ public class SSTableRewriter
             writer = newWriter;
             return;
         }
-        // tmp = false because later we want to query it with descriptor from 
SSTableReader
-        SSTableReader reader = writer.closeAndOpenReader(maxAge);
-        finished.add(reader);
-        replaceReader(currentlyOpenedEarly, reader, false);
-        moveStarts(reader, Functions.constant(reader.last), false);
+        // we leave it as a tmp file, but we open it early and add it to the 
dataTracker
+        SSTableReader reader = writer.openEarly(maxAge);
+        if (reader != null)
+        {
+            finishedOpenedEarly.add(reader);
+            replaceReader(currentlyOpenedEarly, reader, false);
+            moveStarts(reader, Functions.constant(reader.last), false);
+        }
+        finishedWriters.add(Pair.create(writer, reader));
         currentlyOpenedEarly = null;
         currentlyOpenedEarlyAt = 0;
         writer = newWriter;
@@ -306,23 +339,48 @@ public class SSTableRewriter
     {
         finish(cleanupOldReaders, -1);
     }
+
+    /**
+     * Finishes the new file(s)
+     *
+     * Creates final files, adds the new files to the dataTracker (via 
replaceReader) but only marks the
+     * old files as compacted if cleanupOldReaders is set to true. Otherwise 
it is up to the caller to do those gymnastics
+     * (ie, call DataTracker#markCompactedSSTablesReplaced(..))
+     *
+     * @param cleanupOldReaders if we should replace the old files with the 
new ones
+     * @param repairedAt the repair time, -1 if we should use the time we 
supplied when we created
+     *                   the SSTableWriter (and called 
rewriter.switchWriter(..)), actual time if we want to override the
+     *                   repair time.
+     */
     public void finish(boolean cleanupOldReaders, long repairedAt)
     {
         if (writer.getFilePointer() > 0)
         {
-            SSTableReader reader = repairedAt < 0 ?
-                                    writer.closeAndOpenReader(maxAge) :
-                                    writer.closeAndOpenReader(maxAge, 
repairedAt);
+            SSTableReader reader = repairedAt < 0 ? 
writer.closeAndOpenReader(maxAge) : writer.closeAndOpenReader(maxAge, 
repairedAt);
             finished.add(reader);
             replaceReader(currentlyOpenedEarly, reader, false);
             moveStarts(reader, Functions.constant(reader.last), false);
         }
         else
         {
-            writer.abort();
-            writer = null;
+            writer.abort(true);
+        }
+        // make real sstables of the written ones:
+        for (Pair<SSTableWriter, SSTableReader> w : finishedWriters)
+        {
+            if (w.left.getFilePointer() > 0)
+            {
+                SSTableReader newReader = repairedAt < 0 ? 
w.left.closeAndOpenReader(maxAge) : w.left.closeAndOpenReader(maxAge, 
repairedAt);
+                finished.add(newReader);
+                // w.right is the tmplink-reader we added when switching 
writer, replace with the real sstable.
+                replaceReader(w.right, newReader, false);
+            }
+            else
+            {
+                assert w.right == null;
+                w.left.abort(true);
+            }
         }
-
         if (!isOffline)
         {
             dataTracker.unmarkCompacting(finished);
@@ -337,10 +395,12 @@ public class SSTableRewriter
                 reader.releaseReference();
             }
         }
+        isFinished = true;
     }
 
     public List<SSTableReader> finished()
     {
+        assert isFinished;
         return finished;
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/9c316e78/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java 
b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
new file mode 100644
index 0000000..8b203ac
--- /dev/null
+++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java
@@ -0,0 +1,473 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.io.sstable;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+import com.google.common.collect.Sets;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ArrayBackedSortedColumns;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.DecoratedKey;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.Mutation;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
+import org.apache.cassandra.db.compaction.CompactionController;
+import org.apache.cassandra.db.compaction.ICompactionScanner;
+import org.apache.cassandra.db.compaction.LazilyCompactedRow;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.io.sstable.metadata.MetadataCollector;
+import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.ByteBufferUtil;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SSTableRewriterTest extends SchemaLoader
+{
+    private static final String KEYSPACE = "Keyspace1";
+    private static final String CF = "Standard1";
+    @Test
+    public void basicTest() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        for (int j = 0; j < 100; j ++)
+        {
+            ByteBuffer key = ByteBufferUtil.bytes(String.valueOf(j));
+            Mutation rm = new Mutation(KEYSPACE, key);
+            rm.add(CF, Util.cellname("0"), ByteBufferUtil.EMPTY_BYTE_BUFFER, 
j);
+            rm.apply();
+        }
+        cfs.forceBlockingFlush();
+        Set<SSTableReader> sstables = new HashSet<>(cfs.getSSTables());
+        assertEquals(1, sstables.size());
+        SSTableRewriter writer = new SSTableRewriter(cfs, sstables, 1000, 
OperationType.COMPACTION, false);
+        AbstractCompactionStrategy.ScannerList scanners = 
cfs.getCompactionStrategy().getScanners(sstables);
+        ICompactionScanner scanner = scanners.scanners.get(0);
+        CompactionController controller = new CompactionController(cfs, 
sstables, cfs.gcBefore(System.currentTimeMillis()));
+        writer.switchWriter(getWriter(cfs, 
sstables.iterator().next().descriptor.directory));
+        while(scanner.hasNext())
+        {
+            AbstractCompactedRow row = new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next()));
+            writer.append(row);
+        }
+        writer.finish();
+
+        validateCFS(cfs);
+
+    }
+
+
+    @Test
+    public void testFileRemoval() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        ArrayBackedSortedColumns cf = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < 1000; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        SSTableWriter writer = getWriter(cfs, dir);
+
+        for (int i = 0; i < 500; i++)
+            
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
+        SSTableReader s = writer.openEarly(1000);
+        assertFileCounts(dir.list(), 2, 3);
+        for (int i = 500; i < 1000; i++)
+            
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
+        SSTableReader s2 = writer.openEarly(1000);
+        assertTrue(s != s2);
+        assertFileCounts(dir.list(), 2, 3);
+        s.markObsolete();
+        s.releaseReference();
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 3);
+        writer.abort(false);
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testFileRemovalNoAbort() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        ArrayBackedSortedColumns cf = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < 1000; i++)
+            cf.addColumn(Util.column(String.valueOf(i), "a", 1));
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        SSTableWriter writer = getWriter(cfs, dir);
+
+        for (int i = 0; i < 500; i++)
+            
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
+        SSTableReader s = writer.openEarly(1000);
+        //assertFileCounts(dir.list(), 2, 3);
+        for (int i = 500; i < 1000; i++)
+            
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
+        writer.closeAndOpenReader();
+        s.markObsolete();
+        s.releaseReference();
+        Thread.sleep(1000);
+        assertFileCounts(dir.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+
+    @Test
+    public void testNumberOfFiles() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish();
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        // tmplink and tmp files should be gone:
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_dont_clean_readers() throws 
InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish(false);
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files + 1, cfs.getSSTables().size());
+        cfs.getDataTracker().markCompactedSSTablesReplaced(compacting, 
rewriter.finished(), OperationType.COMPACTION);
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+
+    @Test
+    public void testNumberOfFiles_abort() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        DecoratedKey origFirst = s.first;
+        DecoratedKey origLast = s.last;
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+        }
+        rewriter.abort();
+        Thread.sleep(1000);
+        assertEquals(1, cfs.getSSTables().size());
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+        validateCFS(cfs);
+
+    }
+
+    @Test
+    public void testNumberOfFiles_abort2() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        DecoratedKey origFirst = s.first;
+        DecoratedKey origLast = s.last;
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+            if (files == 3)
+            {
+                //testing to abort when we have nothing written in the new file
+                rewriter.abort();
+                break;
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals(1, cfs.getSSTables().size());
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+
+        assertEquals(cfs.getSSTables().iterator().next().first, origFirst);
+        assertEquals(cfs.getSSTables().iterator().next().last, origLast);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_finish_empty_new_writer() throws 
InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+            if (files == 3)
+            {
+                //testing to finish when we have nothing written in the new 
file
+                rewriter.finish();
+                break;
+            }
+        }
+        Thread.sleep(1000);
+        assertEquals(files - 1, cfs.getSSTables().size()); // we never wrote 
anything to the last file
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testNumberOfFiles_truncate() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        SSTableReader s = writeFile(cfs, 1000);
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(10000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 25000000)
+            {
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+                assertEquals(cfs.getSSTables().size(), files); // we have one 
original file plus the ones we have switched out.
+            }
+        }
+        rewriter.finish();
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        cfs.truncateBlocking();
+        validateCFS(cfs);
+    }
+
+    @Test
+    public void testSmallFiles() throws InterruptedException
+    {
+        Keyspace keyspace = Keyspace.open(KEYSPACE);
+        ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF);
+        cfs.truncateBlocking();
+        cfs.disableAutoCompaction();
+
+        SSTableReader s = writeFile(cfs, 400);
+        DecoratedKey origFirst = s.first;
+        cfs.addSSTable(s);
+        Set<SSTableReader> compacting = Sets.newHashSet(s);
+        SSTableRewriter.overrideOpenInterval(1000000);
+        SSTableRewriter rewriter = new SSTableRewriter(cfs, compacting, 1000, 
OperationType.COMPACTION, false);
+        rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+
+        ICompactionScanner scanner = s.getScanner();
+        CompactionController controller = new CompactionController(cfs, 
compacting, 0);
+        int files = 1;
+        while(scanner.hasNext())
+        {
+            rewriter.append(new LazilyCompactedRow(controller, 
Arrays.asList(scanner.next())));
+            if (rewriter.currentWriter().getOnDiskFilePointer() > 2500000)
+            {
+                assertEquals(1, cfs.getSSTables().size()); // we dont open 
small files early ...
+                assertEquals(origFirst, 
cfs.getSSTables().iterator().next().first); // ... and the first key should 
stay the same
+                rewriter.switchWriter(getWriter(cfs, s.descriptor.directory));
+                files++;
+            }
+        }
+        rewriter.finish();
+        assertEquals(files, rewriter.finished().size());
+        assertEquals(files, cfs.getSSTables().size());
+        Thread.sleep(1000);
+        assertFileCounts(s.descriptor.directory.list(), 0, 0);
+        validateCFS(cfs);
+    }
+
+    private SSTableReader writeFile(ColumnFamilyStore cfs, int count)
+    {
+        ArrayBackedSortedColumns cf = 
ArrayBackedSortedColumns.factory.create(cfs.metadata);
+        for (int i = 0; i < count / 100; i++)
+            cf.addColumn(Util.cellname(i), ByteBuffer.allocate(1000), 1);
+        File dir = cfs.directories.getDirectoryForNewSSTables();
+        String filename = cfs.getTempSSTablePath(dir);
+
+        SSTableWriter writer = new SSTableWriter(filename,
+                0,
+                0,
+                cfs.metadata,
+                StorageService.getPartitioner(),
+                new MetadataCollector(cfs.metadata.comparator));
+
+        for (int i = 0; i < count * 5; i++)
+            
writer.append(StorageService.getPartitioner().decorateKey(ByteBufferUtil.bytes(i)),
 cf);
+        return writer.closeAndOpenReader();
+    }
+
+    private void validateCFS(ColumnFamilyStore cfs)
+    {
+        for (SSTableReader sstable : cfs.getSSTables())
+        {
+            assertFalse(sstable.isMarkedCompacted());
+            assertEquals(1, sstable.referenceCount());
+        }
+        assertTrue(cfs.getDataTracker().getCompacting().isEmpty());
+    }
+
+
+    private void assertFileCounts(String [] files, int expectedtmplinkCount, 
int expectedtmpCount)
+    {
+        int tmplinkcount = 0;
+        int tmpcount = 0;
+        for (String f : files)
+        {
+            if (f.contains("-tmplink-"))
+                tmplinkcount++;
+            if (f.contains("-tmp-"))
+                tmpcount++;
+        }
+        assertEquals(expectedtmplinkCount, tmplinkcount);
+        assertEquals(expectedtmpCount, tmpcount);
+    }
+
+    private SSTableWriter getWriter(ColumnFamilyStore cfs, File directory)
+    {
+        String filename = cfs.getTempSSTablePath(directory);
+        return new SSTableWriter(filename,
+                                 0,
+                                 0,
+                                 cfs.metadata,
+                                 StorageService.getPartitioner(),
+                                 new 
MetadataCollector(cfs.metadata.comparator));
+    }
+}

Reply via email to