Repository: cassandra Updated Branches: refs/heads/trunk c066f126e -> e1a1b80d4
Don't delete incremental repair sessions if they still have sstables Patch by Blake Eggleston; reviewed by Marcus Eriksson for CASSANDRA-13758 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e1a1b80d Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e1a1b80d Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e1a1b80d Branch: refs/heads/trunk Commit: e1a1b80d424e31eeb5805c710ce010953160e3a4 Parents: c066f12 Author: Blake Eggleston <[email protected]> Authored: Fri Aug 11 14:23:22 2017 -0700 Committer: Blake Eggleston <[email protected]> Committed: Fri Aug 18 10:48:00 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../compaction/CompactionStrategyManager.java | 5 +++ .../db/compaction/PendingRepairManager.java | 5 +++ .../repair/consistent/LocalSessions.java | 33 ++++++++++++---- .../db/compaction/PendingRepairManagerTest.java | 15 +++++++ .../repair/consistent/LocalSessionTest.java | 41 +++++++++++++++++++- 6 files changed, 90 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index e015a0b..a02a7bf 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Don't delete incremental repair sessions if they still have sstables (CASSANDRA-13758) * Fix pending repair manager index out of bounds check (CASSANDRA-13769) * Don't use RangeFetchMapCalculator when RF=1 (CASSANDRA-13576) * Don't optimise trivial ranges in RangeFetchMapCalculator (CASSANDRA-13664) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index 6342a1b..3b1bc41 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -339,6 +339,11 @@ public class CompactionStrategyManager implements INotificationConsumer return ids; } + public boolean hasDataForPendingRepair(UUID sessionID) + { + return Iterables.any(pendingRepairs, prm -> prm.hasDataForSession(sessionID)); + } + public void shutdown() { writeLock.lock(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java index 4596381..8ee6025 100644 --- a/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java +++ b/src/java/org/apache/cassandra/db/compaction/PendingRepairManager.java @@ -398,6 +398,11 @@ class PendingRepairManager return strategies.values().contains(strategy); } + public synchronized boolean hasDataForSession(UUID sessionID) + { + return strategies.keySet().contains(sessionID); + } + public Collection<AbstractCompactionTask> createUserDefinedTasks(List<SSTableReader> sstables, int gcBefore) { Map<UUID, List<SSTableReader>> group = sstables.stream().collect(Collectors.groupingBy(s -> s.getSSTableMetadata().pendingRepair)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index a25f65c..4ef2c2c 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -32,7 +32,7 @@ import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; - +import java.util.function.Predicate; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; @@ -45,18 +45,15 @@ import com.google.common.primitives.Ints; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.schema.Schema; -import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.BytesType; import org.apache.cassandra.db.marshal.InetAddressType; import org.apache.cassandra.db.marshal.UUIDType; @@ -77,6 +74,8 @@ import org.apache.cassandra.repair.messages.PrepareConsistentResponse; import org.apache.cassandra.repair.messages.RepairMessage; import org.apache.cassandra.repair.messages.StatusRequest; import org.apache.cassandra.repair.messages.StatusResponse; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.StorageService; @@ -260,8 +259,15 @@ public class LocalSessions } else if (shouldDelete(session, now)) { - logger.debug("Auto deleting repair session {}", session); - deleteSession(session.sessionID); + if (!sessionHasData(session)) + { + logger.debug("Auto deleting repair session {}", session); + deleteSession(session.sessionID); + } + else + { + logger.warn("Skipping delete of LocalSession {} because it still contains sstables", session.sessionID); + } } else if (shouldCheckStatus(session, now)) { @@ -737,6 +743,17 @@ public class LocalSessions return session != null && session.getState() != FINALIZED && session.getState() != FAILED; } + @VisibleForTesting + protected boolean sessionHasData(LocalSession session) + { + Predicate<TableId> predicate = tid -> { + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + return cfs != null && cfs.getCompactionStrategyManager().hasDataForPendingRepair(session.sessionID); + + }; + return Iterables.any(session.tableIds, predicate::test); + } + /** * Returns the repairedAt time for a sessions which is unknown, failed, or finalized * calling this for a session which is in progress throws an exception http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java index 33e996b..2b88c9c 100644 --- a/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/PendingRepairManagerTest.java @@ -285,4 +285,19 @@ public class PendingRepairManagerTest extends AbstractPendingRepairTest SSTableReader sstable = makeSSTable(true); prm.getOrCreate(sstable); } + + @Test + public void sessionHasData() + { + PendingRepairManager prm = csm.getPendingRepairManagers().get(0); + + UUID repairID = registerSession(cfs, true, true); + LocalSessionAccessor.prepareUnsafe(repairID, COORDINATOR, PARTICIPANTS); + + Assert.assertFalse(prm.hasDataForSession(repairID)); + SSTableReader sstable = makeSSTable(true); + mutateRepaired(sstable, repairID); + prm.addSSTable(sstable); + Assert.assertTrue(prm.hasDataForSession(repairID)); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e1a1b80d/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index c59462e..be048fb 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -182,6 +182,12 @@ public class LocalSessionTest extends AbstractRepairTest int calls = completedSessions.getOrDefault(sessionID, 0); completedSessions.put(sessionID, calls + 1); } + + boolean sessionHasData = false; + protected boolean sessionHasData(LocalSession session) + { + return sessionHasData; + } } private static TableMetadata cfm; @@ -865,10 +871,10 @@ public class LocalSessionTest extends AbstractRepairTest } /** - * Sessions past the auto delete cutoff should be deleted + * Sessions past the auto delete cutoff with no sstables should be deleted */ @Test - public void cleanupDelete() throws Exception + public void cleanupDeleteNoSSTables() throws Exception { LocalSessions sessions = new InstrumentedLocalSessions(); sessions.start(); @@ -895,6 +901,37 @@ public class LocalSessionTest extends AbstractRepairTest } /** + * Sessions past the auto delete cutoff with no sstables should be deleted + */ + @Test + public void cleanupDeleteSSTablesRemaining() throws Exception + { + InstrumentedLocalSessions sessions = new InstrumentedLocalSessions(); + sessions.start(); + + int time = FBUtilities.nowInSeconds() - LocalSessions.AUTO_FAIL_TIMEOUT - 1; + LocalSession failed = sessionWithTime(time - 1, time); + failed.setState(FAILED); + + LocalSession finalized = sessionWithTime(time - 1, time); + finalized.setState(FINALIZED); + + sessions.putSessionUnsafe(failed); + sessions.putSessionUnsafe(finalized); + Assert.assertNotNull(sessions.getSession(failed.sessionID)); + Assert.assertNotNull(sessions.getSession(finalized.sessionID)); + + sessions.sessionHasData = true; + sessions.cleanup(); + + Assert.assertNotNull(sessions.getSession(failed.sessionID)); + Assert.assertNotNull(sessions.getSession(finalized.sessionID)); + + Assert.assertNotNull(sessions.loadUnsafe(failed.sessionID)); + Assert.assertNotNull(sessions.loadUnsafe(finalized.sessionID)); + } + + /** * Sessions should start checking the status of their participants if * there hasn't been activity for the CHECK_STATUS_TIMEOUT period */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
