This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 1053e3b475 Partial compaction can resurrect deleted data
1053e3b475 is described below

commit 1053e3b475829c7f2d0dc4ab59322d5819d1496a
Author: Tobias Lindaaker <[email protected]>
AuthorDate: Wed May 17 10:42:59 2023 -0700

    Partial compaction can resurrect deleted data
    
    patch by Tobias Lindaaker, Marcus Eriksson; reviewed by David Capwell, 
Marcus Eriksson for CASSANDRA-18507
---
 CHANGES.txt                                        |   1 +
 .../db/compaction/CompactionController.java        |   2 +-
 .../cassandra/db/compaction/CompactionTask.java    |  25 ++-
 .../test/CompactionOverlappingSSTableTest.java     | 114 ++++++++++++
 .../db/compaction/PartialCompactionsTest.java      | 207 +++++++++++++++++++++
 5 files changed, 340 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 097950afb2..6db0e3b084 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.10
+ * Partial compaction can resurrect deleted data (CASSANDRA-18507)
  * Allow internal address to change with reconnecting snitches 
(CASSANDRA-16718)
  * Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
  * NPE when deserializing malformed collections from client (CASSANDRA-18505)
diff --git 
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index bb2094f931..cee2b58f75 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -116,7 +116,7 @@ public class CompactionController extends 
AbstractCompactionController
         }
     }
 
-    private void refreshOverlaps()
+    void refreshOverlaps()
     {
         if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones())
             return;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java 
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3b0e1729d4..90abac3fb2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -83,13 +83,14 @@ public class CompactionTask extends AbstractCompactionTask
         if (partialCompactionsAcceptable() && transaction.originals().size() > 
1)
         {
             // Try again w/o the largest one.
-            logger.warn("insufficient space to compact all requested files. 
{}MB required, {} for compaction {}",
+            SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
+            logger.warn("insufficient space to compact all requested files. 
{}MB required, {} for compaction {} - removing largest SSTable: {}",
                         (float) expectedSize / 1024 / 1024,
                         StringUtils.join(transaction.originals(), ", "),
-                        transaction.opId());
+                        transaction.opId(),
+                        removedSSTable);
             // Note that we have removed files that are still marked as 
compacting.
             // This suboptimal but ok since the caller will unmark all the 
sstables at the end.
-            SSTableReader removedSSTable = 
cfs.getMaxSizeFile(nonExpiredSSTables);
             transaction.cancel(removedSSTable);
             return true;
         }
@@ -123,7 +124,12 @@ public class CompactionTask extends AbstractCompactionTask
             final Set<SSTableReader> fullyExpiredSSTables = 
controller.getFullyExpiredSSTables();
 
             // select SSTables to compact based on available disk space.
-            
buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+            if 
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables))
+            {
+                // The set of sstables has changed (one or more were excluded 
due to limited available disk space).
+                // We need to recompute the overlaps between sstables.
+                controller.refreshOverlaps();
+            }
 
             // sanity check: all sstables must belong to the same cfs
             assert !Iterables.any(transaction.originals(), new 
Predicate<SSTableReader>()
@@ -345,13 +351,15 @@ public class CompactionTask extends AbstractCompactionTask
      * Checks if we have enough disk space to execute the compaction.  Drops 
the largest sstable out of the Task until
      * there's enough space (in theory) to handle the compaction.  Does not 
take into account space that will be taken by
      * other compactions.
+     *
+     * @return true if there is enough disk space to execute the complete 
compaction, false if some sstables are excluded.
      */
-    protected void buildCompactionCandidatesForAvailableDiskSpace(final 
Set<SSTableReader> fullyExpiredSSTables)
+    protected boolean buildCompactionCandidatesForAvailableDiskSpace(final 
Set<SSTableReader> fullyExpiredSSTables)
     {
         if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType == 
OperationType.COMPACTION)
         {
-            logger.info("Compaction space check is disabled");
-            return; // try to compact all SSTables
+            logger.info("Compaction space check is disabled - trying to 
compact all sstables");
+            return true;
         }
 
         final Set<SSTableReader> nonExpiredSSTables = 
Sets.difference(transaction.originals(), fullyExpiredSSTables);
@@ -395,8 +403,9 @@ public class CompactionTask extends AbstractCompactionTask
         {
             CompactionManager.instance.incrementCompactionsReduced();
             
CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
+            return false;
         }
-
+        return true;
     }
 
     protected int getLevel()
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
new file mode 100644
index 0000000000..54f8ad7a7a
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+
+public class CompactionOverlappingSSTableTest extends TestBaseImpl
+{
+    @Test
+    public void partialCompactionOverlappingTest() throws IOException
+    {
+
+        try (Cluster cluster = init(builder().withNodes(1)
+                                             .withDataDirCount(1)
+                                             
.withInstanceInitializer(BB::install)
+                                             .start()))
+        {
+            cluster.schemaChange(withKeyspace("alter keyspace %s with 
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+            cluster.schemaChange(withKeyspace("create table %s.tbl (id int 
primary key) with compaction = {'class':'SizeTieredCompactionStrategy', 
'enabled': 'false'} AND gc_grace_seconds=0"));
+            Set<Integer> expected = Sets.newHashSetWithExpectedSize(990);
+            for (int i = 0; i < 1000; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("insert into 
%s.tbl (id) values (?)"), ConsistencyLevel.ONE, i);
+                if (i >= 10)
+                    expected.add(i);
+            }
+            cluster.get(1).flush(KEYSPACE);
+            for (int i = 0; i < 10; i++)
+            {
+                cluster.coordinator(1).execute(withKeyspace("delete from 
%s.tbl where id = ?"), ConsistencyLevel.ONE, i);
+                cluster.get(1).flush(KEYSPACE);
+            }
+            assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                         .map(x -> x[0])
+                                         .collect(Collectors.toSet()));
+
+            Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); // 
make sure tombstones are gc:able
+
+            cluster.get(1).runOnInstance(() -> {
+                BB.enabled.set(true);
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                cfs.forceMajorCompaction();
+                assertEquals("We should have 2 sstables (not 1) after major 
compaction since we reduced the scope of the compaction",
+                             2, 
Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
+            });
+            assertEquals(expected, 
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from 
%s.tbl"), ConsistencyLevel.ONE))
+                                         .map(x -> x[0])
+                                         .collect(Collectors.toSet()));
+        }
+    }
+
+    public static class BB
+    {
+        static AtomicBoolean enabled = new AtomicBoolean();
+        public static void install(ClassLoader cl, Integer i)
+        {
+            new ByteBuddy().rebase(Directories.class)
+                           
.method(named("hasAvailableDiskSpace").and(takesArguments(2)))
+                           .intercept(MethodDelegation.to(BB.class))
+                           .make()
+                           .load(cl, ClassLoadingStrategy.Default.INJECTION);
+        }
+
+        public static boolean hasAvailableDiskSpace(long ignore1, long ignore2)
+        {
+            if (enabled.get())
+            {
+                enabled.set(false);
+                return false;
+            }
+            return true;
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java 
b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
new file mode 100644
index 0000000000..9cb3872f80
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+public class PartialCompactionsTest extends SchemaLoader
+{
+    static final String KEYSPACE = 
PartialCompactionsTest.class.getSimpleName();
+    static final String TABLE = "testtable";
+
+    @BeforeClass
+    public static void initSchema()
+    {
+        CompactionManager.instance.disableAutoCompaction();
+
+        SchemaLoader.createKeyspace(KEYSPACE,
+                                    KeyspaceParams.simple(1),
+                                    SchemaLoader.standardCFMD(KEYSPACE, 
TABLE));
+
+        LimitableDataDirectory.applyTo(KEYSPACE, TABLE);
+    }
+
+    @Before
+    public void prepareCFS()
+    {
+        LimitableDataDirectory.setAvailableSpace(cfStore(), null);
+    }
+
+    @After
+    public void truncateCF()
+    {
+        cfStore().truncateBlocking();
+        LifecycleTransaction.waitForDeletions();
+    }
+
+    private static ColumnFamilyStore cfStore()
+    {
+        return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+    }
+
+    @Test
+    public void 
shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace()
+    {
+        // given
+        ColumnFamilyStore cfs = cfStore();
+        int few = 10, many = 10 * few;
+
+        // a large sstable as the oldest
+        createDataSSTable(cfs, 0, many);
+        // more inserts (to have more than one sstable to compact)
+        createDataSSTable(cfs, many, many + few);
+        // delete data that's in both of the prior sstables
+        createTombstonesSSTable(cfs, many - few / 2, many + few / 2);
+
+        // emulate there not being enough space to compact all sstables
+        LimitableDataDirectory.setAvailableSpace(cfs, 
enoughSpaceForAllButTheLargestSSTable(cfs));
+
+        // when - run a compaction where all tombstones have timed out
+        
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs, 
Integer.MAX_VALUE, false));
+
+        // then - the tombstones should not be removed
+        assertEquals("live sstables after compaction", 2, 
cfs.getLiveSSTables().size());
+        assertEquals("remaining live rows after compaction", many, 
liveRows(cfs));
+    }
+
+    private static long 
enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs)
+    {
+        long totalSize = 1, maxSize = 0;
+        for (SSTableReader ssTable : cfs.getLiveSSTables())
+        {
+            long size = ssTable.onDiskLength();
+            if (size > maxSize) maxSize = size;
+            totalSize += size;
+        }
+        return totalSize - maxSize;
+    }
+
+    private static int liveRows(ColumnFamilyStore cfs)
+    {
+        return Util.getAll(Util.cmd(cfs, "key1").build()).stream()
+                   .map(partition -> count(partition.rowIterator()))
+                   .reduce(Integer::sum)
+                   .orElse(0);
+    }
+
+    private static int count(Iterator<?> iter)
+    {
+        try (CloseableIterator<?> unused = iter instanceof CloseableIterator ? 
(CloseableIterator<?>) iter : null)
+        {
+            int count = 0;
+            for (; iter.hasNext(); iter.next())
+            {
+                count++;
+            }
+            return count;
+        }
+    }
+
+    private static void createDataSSTable(ColumnFamilyStore cfs, int firstKey, 
int endKey)
+    {
+        for (int i = firstKey; i < endKey; i++)
+        {
+            new RowUpdateBuilder(cfs.metadata(), 0, "key1")
+            .clustering(String.valueOf(i))
+            .add("val", String.valueOf(i))
+            .build()
+            .applyUnsafe();
+        }
+        cfs.forceBlockingFlush();
+    }
+
+    private static void createTombstonesSSTable(ColumnFamilyStore cfs, int 
firstKey, int endKey)
+    {
+        for (int i = firstKey; i < endKey; i++)
+        {
+            RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1", 
String.valueOf(i)).applyUnsafe();
+        }
+        cfs.forceBlockingFlush();
+    }
+
+    private static class LimitableDataDirectory extends 
Directories.DataDirectory
+    {
+        private Long availableSpace;
+
+        LimitableDataDirectory(Directories.DataDirectory dataDirectory)
+        {
+            super(dataDirectory.location);
+        }
+
+        @Override
+        public long getAvailableSpace()
+        {
+            if (availableSpace != null)
+                return availableSpace;
+            return super.getAvailableSpace();
+        }
+
+        public static void setAvailableSpace(ColumnFamilyStore cfs, Long 
availableSpace)
+        {
+            for (Directories.DataDirectory location : 
cfs.getDirectories().getWriteableLocations())
+            {
+                assertThat("ColumnFamilyStore set up with ability to emulate 
limited disk space",
+                           location, instanceOf(LimitableDataDirectory.class));
+                ((LimitableDataDirectory) location).availableSpace = 
availableSpace;
+            }
+        }
+
+        public static void applyTo(String ks, String cf)
+        {
+            Keyspace keyspace = Keyspace.open(ks);
+            ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+            TableMetadataRef metadata = store.metadata;
+            keyspace.dropCf(metadata.id);
+            ColumnFamilyStore cfs = 
ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata, 
wrapDirectoriesOf(store), false, false, true);
+            keyspace.initCfCustom(cfs);
+        }
+
+        private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs)
+        {
+            Directories.DataDirectory[] original = 
cfs.getDirectories().getWriteableLocations();
+            Directories.DataDirectory[] wrapped = new 
Directories.DataDirectory[original.length];
+            for (int i = 0; i < wrapped.length; i++)
+            {
+                wrapped[i] = new LimitableDataDirectory(original[i]);
+            }
+            return new Directories(cfs.metadata(), wrapped);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to