This is an automated email from the ASF dual-hosted git repository.

mck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5c84ed9  Send FAILED_SESSION_MSG on shutdown and on in-progress 
repairs during startup
5c84ed9 is described below

commit 5c84ed9ae8227e550630768a47fef7b2d1f1f1d7
Author: Marcus Eriksson <[email protected]>
AuthorDate: Fri Feb 5 10:20:15 2021 +0100

    Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during 
startup
    
     patch by Marcus Eriksson; reviewed by Adam Holmberg, Chris Lohfink, Mick 
Semb Wever for CASSANDRA-16425
---
 CHANGES.txt                                        |   1 +
 .../cassandra/repair/consistent/LocalSessions.java |  46 +++++-
 .../cassandra/service/ActiveRepairService.java     |   5 +
 .../apache/cassandra/service/StorageService.java   |   1 +
 .../distributed/test/IncRepairAdminTest.java       | 167 +++++++++++++++++++++
 .../repair/consistent/LocalSessionAccessor.java    |  15 +-
 .../repair/consistent/LocalSessionTest.java        |  87 ++++++++++-
 7 files changed, 306 insertions(+), 16 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 6370760..2c834d2 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Send FAILED_SESSION_MSG on shutdown and on in-progress repairs during 
startup (CASSANDRA-16425)
  * Reinstate removed ApplicationState padding (CASSANDRA-16484)
  * Expose data dirs to ColumnFamilyStoreMBean (CASSANDRA-16335)
  * Add possibility to copy SSTables in SSTableImporter instead of moving them 
(CASSANDRA-16407)
diff --git a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java 
b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
index 0c65ba0..e6ca3ee 100644
--- a/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
+++ b/src/java/org/apache/cassandra/repair/consistent/LocalSessions.java
@@ -351,13 +351,44 @@ public class LocalSessions
             }
             catch (IllegalArgumentException | NullPointerException e)
             {
-                logger.warn("Unable to load malformed repair session {}, 
ignoring", row.has("parent_id") ? row.getUUID("parent_id") : null);
+                logger.warn("Unable to load malformed repair session {}, 
removing", row.has("parent_id") ? row.getUUID("parent_id") : null);
+                if (row.has("parent_id"))
+                    deleteRow(row.getUUID("parent_id"));
             }
         }
         sessions = ImmutableMap.copyOf(loadedSessions);
+        failOngoingRepairs();
         started = true;
     }
 
+    public synchronized void stop()
+    {
+        if (!started)
+            return;
+        started = false;
+        failOngoingRepairs();
+    }
+
+    private void failOngoingRepairs()
+    {
+        for (LocalSession session : sessions.values())
+        {
+            synchronized (session)
+            {
+                switch (session.getState())
+                {
+                    case FAILED:
+                    case FINALIZED:
+                    case FINALIZE_PROMISED:
+                        continue;
+                    default:
+                        logger.info("Found repair session {} with state = {} - 
failing the repair", session.sessionID, session.getState());
+                        failSession(session, true);
+                }
+            }
+        }
+    }
+
     public boolean isStarted()
     {
         return started;
@@ -642,7 +673,8 @@ public class LocalSessions
         MessagingService.instance().send(message, destination);
     }
 
-    private void setStateAndSave(LocalSession session, ConsistentSession.State 
state)
+    @VisibleForTesting
+    void setStateAndSave(LocalSession session, ConsistentSession.State state)
     {
         synchronized (session)
         {
@@ -671,20 +703,24 @@ public class LocalSessions
 
     public void failSession(UUID sessionID, boolean sendMessage)
     {
-        LocalSession session = getSession(sessionID);
+        failSession(getSession(sessionID), sendMessage);
+    }
+
+    public void failSession(LocalSession session, boolean sendMessage)
+    {
         if (session != null)
         {
             synchronized (session)
             {
                 if (session.getState() != FAILED)
                 {
-                    logger.info("Failing local repair session {}", sessionID);
+                    logger.info("Failing local repair session {}", 
session.sessionID);
                     setStateAndSave(session, FAILED);
                 }
             }
             if (sendMessage)
             {
-                sendMessage(session.coordinator, 
Message.out(FAILED_SESSION_MSG, new FailSession(sessionID)));
+                sendMessage(session.coordinator, 
Message.out(FAILED_SESSION_MSG, new FailSession(session.sessionID)));
             }
         }
     }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 58587be..165582b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -221,6 +221,11 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
                                                              TimeUnit.SECONDS);
     }
 
+    public void stop()
+    {
+        consistent.local.stop();
+    }
+
     @Override
     public List<Map<String, String>> getSessions(boolean all, String rangesStr)
     {
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index a8ec34d..8c5437c 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -4848,6 +4848,7 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 shutdownClientServers();
             ScheduledExecutors.optionalTasks.shutdown();
             Gossiper.instance.stop();
+            ActiveRepairService.instance.stop();
 
             if (!isFinalShutdown)
                 setMode(Mode.DRAINING, "shutting down MessageService", false);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
new file mode 100644
index 0000000..236c819
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairAdminTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.repair.consistent.LocalSessionAccessor;
+import org.apache.cassandra.service.ActiveRepairService;
+import org.apache.cassandra.streaming.PreviewKind;
+import org.apache.cassandra.utils.UUIDGen;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static 
org.apache.cassandra.repair.consistent.ConsistentSession.State.REPAIRING;
+import static org.junit.Assert.assertTrue;
+
+public class IncRepairAdminTest extends TestBaseImpl
+{
+    @Test
+    public void testManualSessionFail() throws IOException
+    {
+        repairAdminCancelHelper(true, false);
+    }
+
+    @Test
+    public void testManualSessionCancelNonCoordinatorFailure() throws 
IOException
+    {
+        repairAdminCancelHelper(false, false);
+    }
+
+    @Test
+    public void testManualSessionForceCancel() throws IOException
+    {
+        repairAdminCancelHelper(false, true);
+    }
+
+    private void repairAdminCancelHelper(boolean coordinator, boolean force) 
throws IOException
+    {
+        try (Cluster cluster = init(Cluster.build(3)
+                                           .withConfig(config -> 
config.with(GOSSIP)
+                                                                       
.with(NETWORK))
+                                           .start()))
+        {
+            boolean shouldFail = !coordinator && !force;
+            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (k INT PRIMARY 
KEY, v INT)");
+
+            cluster.forEach(i -> {
+                NodeToolResult res = i.nodetoolResult("repair_admin");
+                res.asserts().stdoutContains("no sessions");
+            });
+
+            UUID uuid = makeFakeSession(cluster);
+            awaitNodetoolRepairAdminContains(cluster, uuid, "REPAIRING", 
false);
+            IInvokableInstance instance = cluster.get(coordinator ? 1 : 2);
+
+            NodeToolResult res;
+            if (force)
+            {
+                res = instance.nodetoolResult("repair_admin", "cancel", 
"--session", uuid.toString(), "--force");
+            }
+            else
+            {
+                res = instance.nodetoolResult("repair_admin", "cancel", 
"--session", uuid.toString());
+            }
+
+            if (shouldFail)
+            {
+                res.asserts().failure();
+                // if nodetool repair_admin cancel fails, the session should 
still be repairing:
+                awaitNodetoolRepairAdminContains(cluster, uuid, "REPAIRING", 
true);
+            }
+            else
+            {
+                res.asserts().success();
+                awaitNodetoolRepairAdminContains(cluster, uuid, "FAILED", 
true);
+            }
+        }
+    }
+
+
+
+    private static void awaitNodetoolRepairAdminContains(Cluster cluster, UUID 
uuid, String state, boolean all)
+    {
+        cluster.forEach(i -> {
+            while (true)
+            {
+                NodeToolResult res;
+                if (all)
+                    res = i.nodetoolResult("repair_admin", "list", "--all");
+                else
+                    res = i.nodetoolResult("repair_admin");
+                res.asserts().success();
+                String[] lines = res.getStdout().split("\n");
+                assertTrue(lines.length > 1);
+                for (String line : lines)
+                {
+                    if (line.contains(uuid.toString()) && line.contains(state))
+                        return;
+                }
+                Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
+            }
+        });
+    }
+
+    private static UUID makeFakeSession(Cluster cluster)
+    {
+        UUID sessionId = UUIDGen.getTimeUUID();
+        InetSocketAddress coordinator = 
cluster.get(1).config().broadcastAddress();
+        Set<InetSocketAddress> participants = cluster.stream()
+                                                     .map(i -> 
i.config().broadcastAddress())
+                                                     
.collect(Collectors.toSet());
+        cluster.forEach(i -> {
+            i.runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                Range<Token> range = new 
Range<>(cfs.metadata().partitioner.getMinimumToken(),
+                                                 
cfs.metadata().partitioner.getRandomToken());
+                
ActiveRepairService.instance.registerParentRepairSession(sessionId,
+                                                                         
InetAddressAndPort.getByAddress(coordinator.getAddress()),
+                                                                         
Lists.newArrayList(cfs),
+                                                                         
Sets.newHashSet(range),
+                                                                         true,
+                                                                         
System.currentTimeMillis(),
+                                                                         true,
+                                                                         
PreviewKind.NONE);
+                LocalSessionAccessor.prepareUnsafe(sessionId,
+                                                   
InetAddressAndPort.getByAddress(coordinator.getAddress()),
+                                                   
participants.stream().map(participant -> 
InetAddressAndPort.getByAddress(participant.getAddress())).collect(Collectors.toSet()));
+                LocalSessionAccessor.setState(sessionId, REPAIRING);
+            });
+        });
+        return sessionId;
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
index a7e8272..790f719 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionAccessor.java
@@ -46,18 +46,21 @@ public class LocalSessionAccessor
 
     public static long finalizeUnsafe(UUID sessionID)
     {
-        LocalSession session = ARS.consistent.local.getSession(sessionID);
-        assert session != null;
-        session.setState(ConsistentSession.State.FINALIZED);
-        ARS.consistent.local.save(session);
+        LocalSession session = setState(sessionID, 
ConsistentSession.State.FINALIZED);
         return session.repairedAt;
     }
 
     public static void failUnsafe(UUID sessionID)
     {
-        LocalSession session = ARS.consistent.local.getSession(sessionID);
+        setState(sessionID, ConsistentSession.State.FAILED);
+    }
+
+    public static LocalSession setState(UUID sessionId, 
ConsistentSession.State state)
+    {
+        LocalSession session = ARS.consistent.local.getSession(sessionId);
         assert session != null;
-        session.setState(ConsistentSession.State.FAILED);
+        session.setState(state);
         ARS.consistent.local.save(session);
+        return session;
     }
 }
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
index cb420b7..80a12c0 100644
--- a/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
+++ b/test/unit/org/apache/cassandra/repair/consistent/LocalSessionTest.java
@@ -42,6 +42,7 @@ import org.junit.Test;
 
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.cql3.QueryProcessor;
+import org.apache.cassandra.cql3.UntypedResultSet;
 import org.apache.cassandra.cql3.statements.schema.CreateTableStatement;
 import org.apache.cassandra.locator.RangesAtEndpoint;
 import org.apache.cassandra.net.Message;
@@ -69,6 +70,7 @@ import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.UUIDGen;
 
 import static org.apache.cassandra.repair.consistent.ConsistentSession.State.*;
+import static org.psjava.util.AssertStatus.assertTrue;
 
 public class LocalSessionTest extends AbstractRepairTest
 {
@@ -816,22 +818,95 @@ public class LocalSessionTest extends AbstractRepairTest
         Assert.assertEquals(0, initialSessions.getNumSessions());
         UUID id1 = registerSession();
         UUID id2 = registerSession();
+        UUID id3 = registerSession();
 
         initialSessions.prepareForTest(id1);
         initialSessions.prepareForTest(id2);
-        Assert.assertEquals(2, initialSessions.getNumSessions());
+        initialSessions.prepareForTest(id3);
+
+        Assert.assertEquals(3, initialSessions.getNumSessions());
         LocalSession session1 = initialSessions.getSession(id1);
         LocalSession session2 = initialSessions.getSession(id2);
-
+        LocalSession session3 = initialSessions.getSession(id3);
+        initialSessions.setStateAndSave(session2, PREPARED);
+        initialSessions.setStateAndSave(session2, REPAIRING);
+        initialSessions.setStateAndSave(session2, FINALIZE_PROMISED);
+        initialSessions.setStateAndSave(session3, PREPARED);
+        initialSessions.setStateAndSave(session3, REPAIRING);
+        initialSessions.setStateAndSave(session3, FINALIZE_PROMISED);
+        initialSessions.setStateAndSave(session3, FINALIZED);
+        Assert.assertEquals(3, initialSessions.getNumSessions());
 
         // subsequent startups should load persisted sessions
         InstrumentedLocalSessions nextSessions = new 
InstrumentedLocalSessions();
         Assert.assertEquals(0, nextSessions.getNumSessions());
         nextSessions.start();
-        Assert.assertEquals(2, nextSessions.getNumSessions());
+        Assert.assertEquals(3, nextSessions.getNumSessions());
+
+        LocalSession session1next = nextSessions.getSession(id1);
+        LocalSession session2next = nextSessions.getSession(id2);
+        LocalSession session3next = nextSessions.getSession(id3);
+
+        // non-finalized sessions should fail & notify coordinator after 
startup
+        assertMessagesSent(nextSessions, session1next.coordinator, new 
FailSession(session1next.sessionID));
+        Assert.assertEquals(session1.sessionID, session1next.sessionID);
+        Assert.assertEquals(FAILED, session1next.getState());
+
+        Assert.assertEquals(session2, session2next);
+        Assert.assertEquals(session3, session3next);
+
+    }
 
-        Assert.assertEquals(session1, nextSessions.getSession(id1));
-        Assert.assertEquals(session2, nextSessions.getSession(id2));
+    /**
+     * Stop happy path
+     */
+    @Test
+    public void stop() throws Exception
+    {
+        InstrumentedLocalSessions initialSessions = new 
InstrumentedLocalSessions();
+        initialSessions.start();
+        Assert.assertEquals(0, initialSessions.getNumSessions());
+        UUID id1 = registerSession();
+        UUID id2 = registerSession();
+        UUID id3 = registerSession();
+
+        initialSessions.prepareForTest(id1);
+        initialSessions.prepareForTest(id2);
+        initialSessions.prepareForTest(id3);
+
+        Assert.assertEquals(3, initialSessions.getNumSessions());
+        LocalSession session1 = initialSessions.getSession(id1);
+        LocalSession session2 = initialSessions.getSession(id2);
+        LocalSession session3 = initialSessions.getSession(id3);
+        initialSessions.setStateAndSave(session2, PREPARED);
+        initialSessions.setStateAndSave(session2, REPAIRING);
+        initialSessions.setStateAndSave(session2, FINALIZE_PROMISED);
+        initialSessions.setStateAndSave(session3, PREPARED);
+        initialSessions.setStateAndSave(session3, REPAIRING);
+        initialSessions.setStateAndSave(session3, FINALIZE_PROMISED);
+        initialSessions.setStateAndSave(session3, FINALIZED);
+
+        initialSessions.stop();
+        // clean shutdown should fail session1 & notify coordinator
+        assertMessagesSent(initialSessions, session1.coordinator, new 
FailSession(session1.sessionID));
+
+        // subsequent startups should load persisted sessions
+        InstrumentedLocalSessions nextSessions = new 
InstrumentedLocalSessions();
+        Assert.assertEquals(0, nextSessions.getNumSessions());
+        nextSessions.start();
+        Assert.assertEquals(3, nextSessions.getNumSessions());
+
+        LocalSession session1next = nextSessions.getSession(id1);
+        LocalSession session2next = nextSessions.getSession(id2);
+        LocalSession session3next = nextSessions.getSession(id3);
+
+        Assert.assertEquals(session1, session1next);
+        Assert.assertEquals(session2, session2next);
+        Assert.assertEquals(session3, session3next);
+        // clean shutdown above should make startup send no messages;
+        assertNoMessagesSent(nextSessions, session1next.coordinator);
+        assertNoMessagesSent(nextSessions, session2next.coordinator);
+        assertNoMessagesSent(nextSessions, session3next.coordinator);
     }
 
     /**
@@ -866,6 +941,8 @@ public class LocalSessionTest extends AbstractRepairTest
         sessions = new LocalSessions();
         sessions.start();
         Assert.assertNull(sessions.getSession(session.sessionID));
+        UntypedResultSet res = QueryProcessor.executeInternal("SELECT * FROM 
system.repairs WHERE parent_id=?", session.sessionID);
+        assertTrue(res.isEmpty());
     }
 
     private static LocalSession sessionWithTime(int started, int updated)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to