This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
new 1053e3b475 Partial compaction can resurrect deleted data
1053e3b475 is described below
commit 1053e3b475829c7f2d0dc4ab59322d5819d1496a
Author: Tobias Lindaaker <[email protected]>
AuthorDate: Wed May 17 10:42:59 2023 -0700
Partial compaction can resurrect deleted data
patch by Tobias Lindaaker, Marcus Eriksson; reviewed by David Capwell,
Marcus Eriksson for CASSANDRA-18507
---
CHANGES.txt | 1 +
.../db/compaction/CompactionController.java | 2 +-
.../cassandra/db/compaction/CompactionTask.java | 25 ++-
.../test/CompactionOverlappingSSTableTest.java | 114 ++++++++++++
.../db/compaction/PartialCompactionsTest.java | 207 +++++++++++++++++++++
5 files changed, 340 insertions(+), 9 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 097950afb2..6db0e3b084 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0.10
+ * Partial compaction can resurrect deleted data (CASSANDRA-18507)
* Allow internal address to change with reconnecting snitches
(CASSANDRA-16718)
* Fix quoting in toCqlString methods of UDTs and aggregates (CASSANDRA-17918)
* NPE when deserializing malformed collections from client (CASSANDRA-18505)
diff --git
a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
index bb2094f931..cee2b58f75 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java
@@ -116,7 +116,7 @@ public class CompactionController extends
AbstractCompactionController
}
}
- private void refreshOverlaps()
+ void refreshOverlaps()
{
if (NEVER_PURGE_TOMBSTONES || cfs.getNeverPurgeTombstones())
return;
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
index 3b0e1729d4..90abac3fb2 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java
@@ -83,13 +83,14 @@ public class CompactionTask extends AbstractCompactionTask
if (partialCompactionsAcceptable() && transaction.originals().size() >
1)
{
// Try again w/o the largest one.
- logger.warn("insufficient space to compact all requested files.
{}MB required, {} for compaction {}",
+ SSTableReader removedSSTable =
cfs.getMaxSizeFile(nonExpiredSSTables);
+ logger.warn("insufficient space to compact all requested files.
{}MB required, {} for compaction {} - removing largest SSTable: {}",
(float) expectedSize / 1024 / 1024,
StringUtils.join(transaction.originals(), ", "),
- transaction.opId());
+ transaction.opId(),
+ removedSSTable);
// Note that we have removed files that are still marked as
compacting.
// This suboptimal but ok since the caller will unmark all the
sstables at the end.
- SSTableReader removedSSTable =
cfs.getMaxSizeFile(nonExpiredSSTables);
transaction.cancel(removedSSTable);
return true;
}
@@ -123,7 +124,12 @@ public class CompactionTask extends AbstractCompactionTask
final Set<SSTableReader> fullyExpiredSSTables =
controller.getFullyExpiredSSTables();
// select SSTables to compact based on available disk space.
-
buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables);
+ if
(!buildCompactionCandidatesForAvailableDiskSpace(fullyExpiredSSTables))
+ {
+ // The set of sstables has changed (one or more were excluded
due to limited available disk space).
+ // We need to recompute the overlaps between sstables.
+ controller.refreshOverlaps();
+ }
// sanity check: all sstables must belong to the same cfs
assert !Iterables.any(transaction.originals(), new
Predicate<SSTableReader>()
@@ -345,13 +351,15 @@ public class CompactionTask extends AbstractCompactionTask
* Checks if we have enough disk space to execute the compaction. Drops
the largest sstable out of the Task until
* there's enough space (in theory) to handle the compaction. Does not
take into account space that will be taken by
* other compactions.
+ *
+ * @return true if there is enough disk space to execute the complete
compaction, false if some sstables are excluded.
*/
- protected void buildCompactionCandidatesForAvailableDiskSpace(final
Set<SSTableReader> fullyExpiredSSTables)
+ protected boolean buildCompactionCandidatesForAvailableDiskSpace(final
Set<SSTableReader> fullyExpiredSSTables)
{
if(!cfs.isCompactionDiskSpaceCheckEnabled() && compactionType ==
OperationType.COMPACTION)
{
- logger.info("Compaction space check is disabled");
- return; // try to compact all SSTables
+ logger.info("Compaction space check is disabled - trying to
compact all sstables");
+ return true;
}
final Set<SSTableReader> nonExpiredSSTables =
Sets.difference(transaction.originals(), fullyExpiredSSTables);
@@ -395,8 +403,9 @@ public class CompactionTask extends AbstractCompactionTask
{
CompactionManager.instance.incrementCompactionsReduced();
CompactionManager.instance.incrementSstablesDropppedFromCompactions(sstablesRemoved);
+ return false;
}
-
+ return true;
}
protected int getLevel()
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
new file mode 100644
index 0000000000..54f8ad7a7a
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/CompactionOverlappingSSTableTest.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import net.bytebuddy.ByteBuddy;
+import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
+import net.bytebuddy.implementation.MethodDelegation;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.lifecycle.SSTableSet;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static net.bytebuddy.matcher.ElementMatchers.named;
+import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
+import static org.junit.Assert.assertEquals;
+
+public class CompactionOverlappingSSTableTest extends TestBaseImpl
+{
+ @Test
+ public void partialCompactionOverlappingTest() throws IOException
+ {
+
+ try (Cluster cluster = init(builder().withNodes(1)
+ .withDataDirCount(1)
+
.withInstanceInitializer(BB::install)
+ .start()))
+ {
+ cluster.schemaChange(withKeyspace("alter keyspace %s with
replication = {'class': 'SimpleStrategy', 'replication_factor':3}"));
+ cluster.schemaChange(withKeyspace("create table %s.tbl (id int
primary key) with compaction = {'class':'SizeTieredCompactionStrategy',
'enabled': 'false'} AND gc_grace_seconds=0"));
+ Set<Integer> expected = Sets.newHashSetWithExpectedSize(990);
+ for (int i = 0; i < 1000; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("insert into
%s.tbl (id) values (?)"), ConsistencyLevel.ONE, i);
+ if (i >= 10)
+ expected.add(i);
+ }
+ cluster.get(1).flush(KEYSPACE);
+ for (int i = 0; i < 10; i++)
+ {
+ cluster.coordinator(1).execute(withKeyspace("delete from
%s.tbl where id = ?"), ConsistencyLevel.ONE, i);
+ cluster.get(1).flush(KEYSPACE);
+ }
+ assertEquals(expected,
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from
%s.tbl"), ConsistencyLevel.ONE))
+ .map(x -> x[0])
+ .collect(Collectors.toSet()));
+
+ Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS); //
make sure tombstones are gc:able
+
+ cluster.get(1).runOnInstance(() -> {
+ BB.enabled.set(true);
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ cfs.forceMajorCompaction();
+ assertEquals("We should have 2 sstables (not 1) after major
compaction since we reduced the scope of the compaction",
+ 2,
Iterables.size(cfs.getSSTables(SSTableSet.CANONICAL)));
+ });
+ assertEquals(expected,
Arrays.stream(cluster.coordinator(1).execute(withKeyspace("select * from
%s.tbl"), ConsistencyLevel.ONE))
+ .map(x -> x[0])
+ .collect(Collectors.toSet()));
+ }
+ }
+
+ public static class BB
+ {
+ static AtomicBoolean enabled = new AtomicBoolean();
+ public static void install(ClassLoader cl, Integer i)
+ {
+ new ByteBuddy().rebase(Directories.class)
+
.method(named("hasAvailableDiskSpace").and(takesArguments(2)))
+ .intercept(MethodDelegation.to(BB.class))
+ .make()
+ .load(cl, ClassLoadingStrategy.Default.INJECTION);
+ }
+
+ public static boolean hasAvailableDiskSpace(long ignore1, long ignore2)
+ {
+ if (enabled.get())
+ {
+ enabled.set(false);
+ return false;
+ }
+ return true;
+ }
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
new file mode 100644
index 0000000000..9cb3872f80
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/compaction/PartialCompactionsTest.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db.compaction;
+
+import java.util.Iterator;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.SchemaLoader;
+import org.apache.cassandra.Util;
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Directories;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.db.RowUpdateBuilder;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.schema.KeyspaceParams;
+import org.apache.cassandra.schema.TableMetadataRef;
+import org.apache.cassandra.utils.CloseableIterator;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+
+public class PartialCompactionsTest extends SchemaLoader
+{
+ static final String KEYSPACE =
PartialCompactionsTest.class.getSimpleName();
+ static final String TABLE = "testtable";
+
+ @BeforeClass
+ public static void initSchema()
+ {
+ CompactionManager.instance.disableAutoCompaction();
+
+ SchemaLoader.createKeyspace(KEYSPACE,
+ KeyspaceParams.simple(1),
+ SchemaLoader.standardCFMD(KEYSPACE,
TABLE));
+
+ LimitableDataDirectory.applyTo(KEYSPACE, TABLE);
+ }
+
+ @Before
+ public void prepareCFS()
+ {
+ LimitableDataDirectory.setAvailableSpace(cfStore(), null);
+ }
+
+ @After
+ public void truncateCF()
+ {
+ cfStore().truncateBlocking();
+ LifecycleTransaction.waitForDeletions();
+ }
+
+ private static ColumnFamilyStore cfStore()
+ {
+ return Keyspace.open(KEYSPACE).getColumnFamilyStore(TABLE);
+ }
+
+ @Test
+ public void
shouldNotResurrectDataFromSSTableExcludedDueToInsufficientSpace()
+ {
+ // given
+ ColumnFamilyStore cfs = cfStore();
+ int few = 10, many = 10 * few;
+
+ // a large sstable as the oldest
+ createDataSSTable(cfs, 0, many);
+ // more inserts (to have more than one sstable to compact)
+ createDataSSTable(cfs, many, many + few);
+ // delete data that's in both of the prior sstables
+ createTombstonesSSTable(cfs, many - few / 2, many + few / 2);
+
+ // emulate there not being enough space to compact all sstables
+ LimitableDataDirectory.setAvailableSpace(cfs,
enoughSpaceForAllButTheLargestSSTable(cfs));
+
+ // when - run a compaction where all tombstones have timed out
+
FBUtilities.waitOnFutures(CompactionManager.instance.submitMaximal(cfs,
Integer.MAX_VALUE, false));
+
+ // then - the tombstones should not be removed
+ assertEquals("live sstables after compaction", 2,
cfs.getLiveSSTables().size());
+ assertEquals("remaining live rows after compaction", many,
liveRows(cfs));
+ }
+
+ private static long
enoughSpaceForAllButTheLargestSSTable(ColumnFamilyStore cfs)
+ {
+ long totalSize = 1, maxSize = 0;
+ for (SSTableReader ssTable : cfs.getLiveSSTables())
+ {
+ long size = ssTable.onDiskLength();
+ if (size > maxSize) maxSize = size;
+ totalSize += size;
+ }
+ return totalSize - maxSize;
+ }
+
+ private static int liveRows(ColumnFamilyStore cfs)
+ {
+ return Util.getAll(Util.cmd(cfs, "key1").build()).stream()
+ .map(partition -> count(partition.rowIterator()))
+ .reduce(Integer::sum)
+ .orElse(0);
+ }
+
+ private static int count(Iterator<?> iter)
+ {
+ try (CloseableIterator<?> unused = iter instanceof CloseableIterator ?
(CloseableIterator<?>) iter : null)
+ {
+ int count = 0;
+ for (; iter.hasNext(); iter.next())
+ {
+ count++;
+ }
+ return count;
+ }
+ }
+
+ private static void createDataSSTable(ColumnFamilyStore cfs, int firstKey,
int endKey)
+ {
+ for (int i = firstKey; i < endKey; i++)
+ {
+ new RowUpdateBuilder(cfs.metadata(), 0, "key1")
+ .clustering(String.valueOf(i))
+ .add("val", String.valueOf(i))
+ .build()
+ .applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ private static void createTombstonesSSTable(ColumnFamilyStore cfs, int
firstKey, int endKey)
+ {
+ for (int i = firstKey; i < endKey; i++)
+ {
+ RowUpdateBuilder.deleteRow(cfs.metadata(), 1, "key1",
String.valueOf(i)).applyUnsafe();
+ }
+ cfs.forceBlockingFlush();
+ }
+
+ private static class LimitableDataDirectory extends
Directories.DataDirectory
+ {
+ private Long availableSpace;
+
+ LimitableDataDirectory(Directories.DataDirectory dataDirectory)
+ {
+ super(dataDirectory.location);
+ }
+
+ @Override
+ public long getAvailableSpace()
+ {
+ if (availableSpace != null)
+ return availableSpace;
+ return super.getAvailableSpace();
+ }
+
+ public static void setAvailableSpace(ColumnFamilyStore cfs, Long
availableSpace)
+ {
+ for (Directories.DataDirectory location :
cfs.getDirectories().getWriteableLocations())
+ {
+ assertThat("ColumnFamilyStore set up with ability to emulate
limited disk space",
+ location, instanceOf(LimitableDataDirectory.class));
+ ((LimitableDataDirectory) location).availableSpace =
availableSpace;
+ }
+ }
+
+ public static void applyTo(String ks, String cf)
+ {
+ Keyspace keyspace = Keyspace.open(ks);
+ ColumnFamilyStore store = keyspace.getColumnFamilyStore(cf);
+ TableMetadataRef metadata = store.metadata;
+ keyspace.dropCf(metadata.id);
+ ColumnFamilyStore cfs =
ColumnFamilyStore.createColumnFamilyStore(keyspace, cf, metadata,
wrapDirectoriesOf(store), false, false, true);
+ keyspace.initCfCustom(cfs);
+ }
+
+ private static Directories wrapDirectoriesOf(ColumnFamilyStore cfs)
+ {
+ Directories.DataDirectory[] original =
cfs.getDirectories().getWriteableLocations();
+ Directories.DataDirectory[] wrapped = new
Directories.DataDirectory[original.length];
+ for (int i = 0; i < wrapped.length; i++)
+ {
+ wrapped[i] = new LimitableDataDirectory(original[i]);
+ }
+ return new Directories(cfs.metadata(), wrapped);
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]