This is an automated email from the ASF dual-hosted git repository.
mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 5c84ed9 Send FAILED_SESSION_MSG on shutdown and on in-progress
repairs during startup
5c84ed9 is described below
commit 5c84ed9ae8227e550630768a47fef7b2d1f1f1d7
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Feb 5 10:20:15 2021 +0100
Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during
startup
patch by Marcus Eriksson; reviewed by Adam Holmberg, Chris Lohfink, Mick
Semb Wever for CASSANDRA-16425
---
CHANGES.txt | 1 +
.../cassandra/repair/consistent/LocalSessions.java | 46 +++++-
.../cassandra/service/ActiveRepairService.java | 5 +
.../apache/cassandra/service/StorageService.java | 1 +
.../distributed/test/IncRepairAdminTest.java | 167 +++++++++++++++++++++
.../repair/consistent/LocalSessionAccessor.java | 15 +-
.../repair/consistent/LocalSessionTest.java | 87 ++++++++++-
7 files changed, 306 insertions(+), 16 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 6370760..2c834d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during
startup (CASSANDRA-16425)
* Reinstate removed ApplicationState padding (CASSANDRA-16484)
* Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335)
* Add possibility to copy SSTables in SSTableImporter instead of moving them
(CASSANDRA-16407)
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 0c65ba0..e6ca3ee 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -351,13 +351,44 @@ public class LocalSessions
}
catch (IllegalArgumentException | NullPointerException e)
{
- logger.warn("Unable to load malformed repair session {},
ignoring", row.has("parent_id") ? row.getUUID("parent_id") : null);
+ logger.warn("Unable to load malformed repair session {},
removing", row.has("parent_id") ? row.getUUID("parent_id") : null);
+ if (row.has("parent_id"))
+ deleteRow(row.getUUID("parent_id"));
}
}
sessions = ImmutableMap.copyOf(loadedSessions);
+ failOngoingRepairs();
started = true;
}
+ public synchronized void stop()
+ {
+ if (!started)
+ return;
+ started = false;
+ failOngoingRepairs();
+ }
+
+ private void failOngoingRepairs()
+ {
+ for (LocalSession session : sessions.values())
+ {
+ synchronized (session)
+ {
+ switch (session.getState())
+ {
+ case FAILED:
+ case FINALIZED:
+ case FINALIZE_PROMISED:
+ continue;
+ default:
+ logger.info("Found repair session {} with state = {} -
failing the repair", session.sessionID, session.getState());
+ failSession(session, true);
+ }
+ }
+ }
+ }
+
public boolean isStarted()
{
return started;
@@ -642,7 +673,8 @@ public class LocalSessions
MessagingService.instance().send(message, destination);
}
- private void setStateAndSave(LocalSession session, ConsistentSession.State
state)
+ @VisibleForTesting
+ void setStateAndSave(LocalSession session, ConsistentSession.State state)
{
synchronized (session)
{
@@ -671,20 +703,24 @@ public class LocalSessions
public void failSession(UUID sessionID, boolean sendMessage)
{
- LocalSession session = getSession(sessionID);
+ failSession(getSession(sessionID), sendMessage);
+ }
+
+ public void failSession(LocalSession session, boolean sendMessage)
+ {
if (session != null)
{
synchronized (session)
{
if (session.getState() != FAILED)
{
- logger.info("Failing local repair session {}", sessionID);
+ logger.info("Failing local repair session {}",
session.sessionID);
setStateAndSave(session, FAILED);
}
}
if (sendMessage)
{
- sendMessage(session.coordinator,
Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
+ sendMessage(session.coordinator,
Message.out(FAILED_SESSION_MSG, new FailSession(session.sessionID)));
}
}
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 58587be..165582b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -221,6 +221,11 @@ public class ActiveRepairService implements
IEndpointStateChangeSubscriber, IFai
TimeUnit.SECONDS);
}
+ public void stop()
+ {
+ consistent.local.stop();
+ }
+
@Override
public List<Map<String, String>> getSessions(boolean all, String rangesStr)
{
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index a8ec34d..8c5437c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4848,6 +4848,7 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
shutdownClientServers();
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();
+ ActiveRepairService.instance.stop();
if (!isFinalShutdown)
setMode(Mode.DRAINING, "shutting down MessageService", false);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
new file mode 100644
index 0000000..236c819
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static
org.apache.cassandra.repair.consistent.ConsistentSession.State.REPAIRING;
+import static org.junit.Assert.assertTrue;
+
+public class IncRepairAdminTest extends TestBaseImpl
+{
+ @Test
+ public void testManualSessionFail() throws IOException
+ {
+ repairAdminCancelHelper(true, false);
+ }
+
+ @Test
+ public void testManualSessionCancelNonCoordinatorFailure() throws
IOException
+ {
+ repairAdminCancelHelper(false, false);
+ }
+
+ @Test
+ public void testManualSessionForceCancel() throws IOException
+ {
+ repairAdminCancelHelper(false, true);
+ }
+
+ private void repairAdminCancelHelper(boolean coordinator, boolean force)
throws IOException
+ {
+ try (Cluster cluster = init(Cluster.build(3)
+ .withConfig(config ->
config.with(GOSSIP)
+
.with(NETWORK))
+ .start()))
+ {
+ boolean shouldFail = !coordinator && !force;
+ cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (k INT PRIMARY
KEY, v INT)");
+
+ cluster.forEach(i -> {
+ NodeToolResult res = i.nodetoolResult("repair_admin");
+ res.asserts().stdoutContains("no sessions");
+ });
+
+ UUID uuid = makeFakeSession(cluster);
+ awaitNodetoolRepairAdminContains(cluster, uuid, "REPAIRING",
false);
+ IInvokableInstance instance = cluster.get(coordinator ? 1 : 2);
+
+ NodeToolResult res;
+ if (force)
+ {
+ res = instance.nodetoolResult("repair_admin", "cancel",
"--session", uuid.toString(), "--force");
+ }
+ else
+ {
+ res = instance.nodetoolResult("repair_admin", "cancel",
"--session", uuid.toString());
+ }
+
+ if (shouldFail)
+ {
+ res.asserts().failure();
+ // if nodetool repair_admin cancel fails, the session should
still be repairing:
+ awaitNodetoolRepairAdminContains(cluster, uuid, "REPAIRING",
true);
+ }
+ else
+ {
+ res.asserts().success();
+ awaitNodetoolRepairAdminContains(cluster, uuid, "FAILED",
true);
+ }
+ }
+ }
+
+
+
+ private static void awaitNodetoolRepairAdminContains(Cluster cluster, UUID
uuid, String state, boolean all)
+ {
+ cluster.forEach(i -> {
+ while (true)
+ {
+ NodeToolResult res;
+ if (all)
+ res = i.nodetoolResult("repair_admin", "list", "--all");
+ else
+ res = i.nodetoolResult("repair_admin");
+ res.asserts().success();
+ String[] lines = res.getStdout().split("\n");
+ assertTrue(lines.length > 1);
+ for (String line : lines)
+ {
+ if (line.contains(uuid.toString()) && line.contains(state))
+ return;
+ }
+ Uninterruptibles.sleepUninterruptibly(100,
TimeUnit.MILLISECONDS);
+ }
+ });
+ }
+
+ private static UUID makeFakeSession(Cluster cluster)
+ {
+ UUID sessionId = UUIDGen.getTimeUUID();
+ InetSocketAddress coordinator =
cluster.get(1).config().broadcastAddress();
+ Set<InetSocketAddress> participants = cluster.stream()
+ .map(i ->
i.config().broadcastAddress())
+
.collect(Collectors.toSet());
+ cluster.forEach(i -> {
+ i.runOnInstance(() -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ Range<Token> range = new
Range<>(cfs.metadata().partitioner.getMinimumToken(),
+
cfs.metadata().partitioner.getRandomToken());
+
ActiveRepairService.instance.registerParentRepairSession(sessionId,
+
InetAddressAndPort.getByAddress(coordinator.getAddress()),
+
Lists.newArrayList(cfs),
+
Sets.newHashSet(range),
+ true,
+
System.currentTimeMillis(),
+ true,
+
PreviewKind.NONE);
+ LocalSessionAccessor.prepareUnsafe(sessionId,
+
InetAddressAndPort.getByAddress(coordinator.getAddress()),
+
participants.stream().map(participant ->
InetAddressAndPort.getByAddress(participant.getAddress())).collect(Collectors.toSet()));
+ LocalSessionAccessor.setState(sessionId, REPAIRING);
+ });
+ });
+ return sessionId;
+ }
+}
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index a7e8272..790f719 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -46,18 +46,21 @@ public class LocalSessionAccessor
public static long finalizeUnsafe(UUID sessionID)
{
- LocalSession session = ARS.consistent.local.getSession(sessionID);
- assert session != null;
- session.setState(ConsistentSession.State.FINALIZED);
- ARS.consistent.local.save(session);
+ LocalSession session = setState(sessionID,
ConsistentSession.State.FINALIZED);
return session.repairedAt;
}
public static void failUnsafe(UUID sessionID)
{
- LocalSession session = ARS.consistent.local.getSession(sessionID);
+ setState(sessionID, ConsistentSession.State.FAILED);
+ }
+
+ public static LocalSession setState(UUID sessionId,
ConsistentSession.State state)
+ {
+ LocalSession session = ARS.consistent.local.getSession(sessionId);
assert session != null;
- session.setState(ConsistentSession.State.FAILED);
+ session.setState(state);
ARS.consistent.local.save(session);
+ return session;
}
}
diff --git
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index cb420b7..80a12c0 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
import org.apache.cassandra.locator.RangesAtEndpoint;
import org.apache.cassandra.net.Message;
@@ -69,6 +70,7 @@ import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;
import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.psjava.util.AssertStatus.assertTrue;
public class LocalSessionTest extends AbstractRepairTest
{
@@ -816,22 +818,95 @@ public class LocalSessionTest extends AbstractRepairTest
Assert.assertEquals(0, initialSessions.getNumSessions());
UUID id1 = registerSession();
UUID id2 = registerSession();
+ UUID id3 = registerSession();
initialSessions.prepareForTest(id1);
initialSessions.prepareForTest(id2);
- Assert.assertEquals(2, initialSessions.getNumSessions());
+ initialSessions.prepareForTest(id3);
+
+ Assert.assertEquals(3, initialSessions.getNumSessions());
LocalSession session1 = initialSessions.getSession(id1);
LocalSession session2 = initialSessions.getSession(id2);
-
+ LocalSession session3 = initialSessions.getSession(id3);
+ initialSessions.setStateAndSave(session2, PREPARED);
+ initialSessions.setStateAndSave(session2, REPAIRING);
+ initialSessions.setStateAndSave(session2, FINALIZE_PROMISED);
+ initialSessions.setStateAndSave(session3, PREPARED);
+ initialSessions.setStateAndSave(session3, REPAIRING);
+ initialSessions.setStateAndSave(session3, FINALIZE_PROMISED);
+ initialSessions.setStateAndSave(session3, FINALIZED);
+ Assert.assertEquals(3, initialSessions.getNumSessions());
// subsequent startups should load persisted sessions
InstrumentedLocalSessions nextSessions = new
InstrumentedLocalSessions();
Assert.assertEquals(0, nextSessions.getNumSessions());
nextSessions.start();
- Assert.assertEquals(2, nextSessions.getNumSessions());
+ Assert.assertEquals(3, nextSessions.getNumSessions());
+
+ LocalSession session1next = nextSessions.getSession(id1);
+ LocalSession session2next = nextSessions.getSession(id2);
+ LocalSession session3next = nextSessions.getSession(id3);
+
+ // non-finalized sessions should fail & notify coordinator after
startup
+ assertMessagesSent(nextSessions, session1next.coordinator, new
FailSession(session1next.sessionID));
+ Assert.assertEquals(session1.sessionID, session1next.sessionID);
+ Assert.assertEquals(FAILED, session1next.getState());
+
+ Assert.assertEquals(session2, session2next);
+ Assert.assertEquals(session3, session3next);
+
+ }
- Assert.assertEquals(session1, nextSessions.getSession(id1));
- Assert.assertEquals(session2, nextSessions.getSession(id2));
+ /**
+ * Stop happy path
+ */
+ @Test
+ public void stop() throws Exception
+ {
+ InstrumentedLocalSessions initialSessions = new
InstrumentedLocalSessions();
+ initialSessions.start();
+ Assert.assertEquals(0, initialSessions.getNumSessions());
+ UUID id1 = registerSession();
+ UUID id2 = registerSession();
+ UUID id3 = registerSession();
+
+ initialSessions.prepareForTest(id1);
+ initialSessions.prepareForTest(id2);
+ initialSessions.prepareForTest(id3);
+
+ Assert.assertEquals(3, initialSessions.getNumSessions());
+ LocalSession session1 = initialSessions.getSession(id1);
+ LocalSession session2 = initialSessions.getSession(id2);
+ LocalSession session3 = initialSessions.getSession(id3);
+ initialSessions.setStateAndSave(session2, PREPARED);
+ initialSessions.setStateAndSave(session2, REPAIRING);
+ initialSessions.setStateAndSave(session2, FINALIZE_PROMISED);
+ initialSessions.setStateAndSave(session3, PREPARED);
+ initialSessions.setStateAndSave(session3, REPAIRING);
+ initialSessions.setStateAndSave(session3, FINALIZE_PROMISED);
+ initialSessions.setStateAndSave(session3, FINALIZED);
+
+ initialSessions.stop();
+ // clean shutdown should fail session1 & notify coordinator
+ assertMessagesSent(initialSessions, session1.coordinator, new
FailSession(session1.sessionID));
+
+ // subsequent startups should load persisted sessions
+ InstrumentedLocalSessions nextSessions = new
InstrumentedLocalSessions();
+ Assert.assertEquals(0, nextSessions.getNumSessions());
+ nextSessions.start();
+ Assert.assertEquals(3, nextSessions.getNumSessions());
+
+ LocalSession session1next = nextSessions.getSession(id1);
+ LocalSession session2next = nextSessions.getSession(id2);
+ LocalSession session3next = nextSessions.getSession(id3);
+
+ Assert.assertEquals(session1, session1next);
+ Assert.assertEquals(session2, session2next);
+ Assert.assertEquals(session3, session3next);
+ // clean shutdown above should make startup send no messages;
+ assertNoMessagesSent(nextSessions, session1next.coordinator);
+ assertNoMessagesSent(nextSessions, session2next.coordinator);
+ assertNoMessagesSent(nextSessions, session3next.coordinator);
}
/**
@@ -866,6 +941,8 @@ public class LocalSessionTest extends AbstractRepairTest
sessions = new LocalSessions();
sessions.start();
Assert.assertNull(sessions.getSession(session.sessionID));
+ UntypedResultSet res = QueryProcessor.executeInternal("SELECT * FROM
system.repairs WHERE parent_id=?", session.sessionID);
+ assertTrue(res.isEmpty());
}
private static LocalSession sessionWithTime(int started, int updated)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]