Repository: cassandra Updated Branches: refs/heads/trunk 4cfaf855c -> 2381be314
Start compaction when incremental repair finishes Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13454 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2381be31 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2381be31 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2381be31 Branch: refs/heads/trunk Commit: 2381be31404403285948a3097c9ed55e7f901135 Parents: 4cfaf85 Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Mon Apr 17 15:28:28 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Mon Apr 24 09:29:56 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../repair/consistent/LocalSessions.java | 22 ++++++++++++++++++++ .../repair/consistent/LocalSessionTest.java | 13 ++++++++++++ 3 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2381be31/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index fc06bde..9c933e1 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Start compaction when incremental repair finishes (CASSANDRA-13454) * Add repair streaming preview (CASSANDRA-13257) * Cleanup isIncremental/repairedAt usage (CASSANDRA-13430) * Change protocol to allow sending key space independent of query string (CASSANDRA-10145) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2381be31/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java index d10b9c5..06e102e 100644 --- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java +++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java @@ -50,6 +50,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.cql3.QueryProcessor; import org.apache.cassandra.cql3.UntypedResultSet; @@ -463,9 +466,15 @@ public class LocalSessions "Invalid state transition %s -> %s", session.getState(), state); logger.debug("Setting LocalSession state from {} -> {} for {}", session.getState(), state, session.sessionID); + boolean wasCompleted = session.isCompleted(); session.setState(state); session.setLastUpdate(); save(session); + + if (session.isCompleted() && !wasCompleted) + { + sessionCompleted(session); + } } } @@ -593,6 +602,19 @@ public class LocalSessions } } + @VisibleForTesting + protected void sessionCompleted(LocalSession session) + { + for (TableId tid: session.tableIds) + { + ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(tid); + if (cfs != null) + { + CompactionManager.instance.submitBackground(cfs); + } + } + } + /** * Finalizes the repair session, completing it as successful. * http://git-wip-us.apache.org/repos/asf/cassandra/blob/2381be31/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java index 2a4ce9a..a5197ec 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java @@ -173,6 +173,15 @@ public class LocalSessionTest extends AbstractRepairTest { return true; } + + public Map<UUID, Integer> completedSessions = new HashMap<>(); + + protected void sessionCompleted(LocalSession session) + { + UUID sessionID = session.sessionID; + int calls = completedSessions.getOrDefault(sessionID, 0); + completedSessions.put(sessionID, calls + 1); + } } private static TableMetadata cfm; @@ -450,6 +459,7 @@ public class LocalSessionTest extends AbstractRepairTest sessions.maybeSetRepairing(sessionID); sessions.handleFinalizeProposeMessage(COORDINATOR, new FinalizePropose(sessionID)); + Assert.assertEquals(0, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); sessions.sentMessages.clear(); LocalSession session = sessions.getSession(sessionID); sessions.handleFinalizeCommitMessage(PARTICIPANT1, new FinalizeCommit(sessionID)); @@ -457,6 +467,7 @@ public class LocalSessionTest extends AbstractRepairTest Assert.assertEquals(FINALIZED, session.getState()); Assert.assertEquals(session, sessions.loadUnsafe(sessionID)); Assert.assertTrue(sessions.sentMessages.isEmpty()); + Assert.assertEquals(1, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); } @Test @@ -482,9 +493,11 @@ public class LocalSessionTest extends AbstractRepairTest sessions.sentMessages.clear(); // fail session + Assert.assertEquals(0, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); sessions.failSession(sessionID); Assert.assertEquals(FAILED, session.getState()); assertMessagesSent(sessions, COORDINATOR, new FailSession(sessionID)); + Assert.assertEquals(1, (int) sessions.completedSessions.getOrDefault(sessionID, 0)); } /**