This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d4eba9f  Abort repairs when getting a truncation request
d4eba9f is described below

commit d4eba9faa1b57fed205813a639fe53bbbdc06ef1
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Jun 11 09:37:03 2020 +0200

    Abort repairs when getting a truncation request
    
    Patch by marcuse; reviewed by Caleb Rackliffe and David Capwell for 
CASSANDRA-15854
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  23 ++-
 .../org/apache/cassandra/repair/RepairJob.java     |   3 +
 .../cassandra/service/ActiveRepairService.java     |  35 ++---
 .../distributed/test/IncRepairTruncationTest.java  | 156 +++++++++++++++++++++
 .../distributed/test/PreviewRepairTest.java        |   2 +-
 6 files changed, 199 insertions(+), 21 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index ec95e20..0215a71 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Abort repairs when getting a truncation request (CASSANDRA-15854)
  * Remove bad assert when getting active compactions for an sstable 
(CASSANDRA-15457)
  * Avoid failing compactions with very large partitions (CASSANDRA-15164)
  * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cb6c86b..1fc3eba 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -79,6 +79,7 @@ import 
org.apache.cassandra.repair.consistent.admin.CleanupSummary;
 import org.apache.cassandra.repair.consistent.admin.PendingStat;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.service.CacheService;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.streaming.TableStreamManager;
@@ -90,8 +91,6 @@ import org.json.simple.JSONArray;
 import org.json.simple.JSONObject;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
-import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
 import static org.apache.cassandra.utils.Throwables.maybeFail;
 import static org.apache.cassandra.utils.Throwables.merge;
 
@@ -2194,7 +2193,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         {
             public void run()
             {
-                logger.debug("Discarding sstable data for truncated CF + 
indexes");
+                logger.info("Truncating {}.{} with truncatedAt={}", 
keyspace.getName(), getTableName(), truncatedAt);
+                // since truncation can happen at different times on different 
nodes, we need to make sure
+                // that any repairs are aborted, otherwise we might clear the 
data on one node and then
+                // stream in data that is actually supposed to have been 
deleted
+                ActiveRepairService.instance.abort((prs) -> 
prs.getTableIds().contains(metadata.id),
+                                                   "Stopping parent sessions 
{} due to truncation of tableId="+metadata.id);
                 data.notifyTruncated(truncatedAt);
 
                 if (DatabaseDescriptor.isAutoSnapshot())
@@ -2208,6 +2212,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
                 SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this, 
truncatedAt, replayAfter);
                 logger.trace("cleaning out row cache");
                 invalidateCaches();
+
             }
         };
 
@@ -2625,15 +2630,25 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean
         assert data.getCompacting().isEmpty() : data.getCompacting();
 
         List<SSTableReader> truncatedSSTables = new ArrayList<>();
-
+        int keptSSTables = 0;
         for (SSTableReader sstable : getSSTables(SSTableSet.LIVE))
         {
             if (!sstable.newSince(truncatedAt))
+            {
                 truncatedSSTables.add(sstable);
+            }
+            else
+            {
+                keptSSTables++;
+                logger.info("Truncation is keeping {} maxDataAge={} 
truncatedAt={}", sstable, sstable.maxDataAge, truncatedAt);
+            }
         }
 
         if (!truncatedSSTables.isEmpty())
+        {
+            logger.info("Truncation is dropping {} sstables and keeping {} due 
to sstable.maxDataAge > truncatedAt", truncatedSSTables.size(), keptSSTables);
             markObsolete(truncatedSSTables, OperationType.UNKNOWN);
+        }
     }
 
     public double getDroppableTombstoneRatio()
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java 
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 16eb325..341b0fd 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.repair.asymmetric.HostDifferences;
 import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
 import org.apache.cassandra.repair.asymmetric.ReduceHelper;
 import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ActiveRepairService;
 import org.apache.cassandra.streaming.PreviewKind;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
@@ -274,6 +275,8 @@ public class RepairJob extends AbstractFuture<RepairResult> 
implements Runnable
     @VisibleForTesting
     ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks)
     {
+        // this throws if the parent session has failed
+        
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
         for (SyncTask task : syncTasks)
         {
             if (!task.isLocal())
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 3b13907..f7b0686 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -23,6 +23,7 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.openmbean.CompositeData;
+import java.util.function.Predicate;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -780,22 +781,7 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || 
parentRepairSessions.isEmpty())
             return;
 
-        Set<UUID> toRemove = new HashSet<>();
-
-        for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : 
parentRepairSessions.entrySet())
-        {
-            if (repairSessionEntry.getValue().coordinator.equals(ep))
-            {
-                toRemove.add(repairSessionEntry.getKey());
-            }
-        }
-
-        if (!toRemove.isEmpty())
-        {
-            logger.debug("Removing {} in parent repair sessions", toRemove);
-            for (UUID id : toRemove)
-                removeParentRepairSession(id);
-        }
+        abort((prs) -> prs.coordinator.equals(ep), "Removing {} in parent 
repair sessions");
     }
 
     public int getRepairPendingCompactionRejectThreshold()
@@ -808,4 +794,21 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
     }
 
+    /**
+     * Remove any parent repair sessions matching predicate
+     */
+    public void abort(Predicate<ParentRepairSession> predicate, String message)
+    {
+        Set<UUID> parentSessionsToRemove = new HashSet<>();
+        for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : 
parentRepairSessions.entrySet())
+        {
+            if (predicate.test(repairSessionEntry.getValue()))
+                parentSessionsToRemove.add(repairSessionEntry.getKey());
+        }
+        if (!parentSessionsToRemove.isEmpty())
+        {
+            logger.info(message, parentSessionsToRemove);
+            parentSessionsToRemove.forEach(this::removeParentRepairSession);
+        }
+    }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
new file mode 100644
index 0000000..bd47906
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.test.PreviewRepairTest.insert;
+
+public class IncRepairTruncationTest extends TestBaseImpl
+{
+    @Test
+    public void testTruncateDuringIncRepair() throws IOException, 
InterruptedException, ExecutionException
+    {
+        ExecutorService es = Executors.newFixedThreadPool(3);
+        try(Cluster cluster = init(Cluster.build(2)
+                                          .withConfig(config -> 
config.set("disable_incremental_repair", false)
+                                                                      
.with(GOSSIP)
+                                                                      
.with(NETWORK))
+                                          .start()))
+        {
+            cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int 
primary key, t int)");
+
+            insert(cluster.coordinator(1), 0, 100);
+            cluster.forEach((node) -> node.flush(KEYSPACE));
+            // mark everything repaired
+            cluster.get(1).nodetoolResult("repair", KEYSPACE, 
"tbl").asserts().success();
+
+            /*
+            make sure we are out-of-sync to make node2 stream data to node1:
+             */
+            cluster.get(2).executeInternal("insert into "+KEYSPACE+".tbl (id, 
t) values (5, 5)");
+            cluster.get(2).flush(KEYSPACE);
+            /*
+            start repair:
+            block streaming from 2 -> 1 until truncation below has executed
+             */
+            BlockMessage node2Streaming = new BlockMessage();
+            
cluster.filters().inbound().verbs(Verb.VALIDATION_RSP.id).from(2).to(1).messagesMatching(node2Streaming).drop();
+
+            /*
+            block truncation on node2:
+             */
+            BlockMessage node2Truncation = new BlockMessage();
+            
cluster.filters().inbound().verbs(Verb.TRUNCATE_REQ.id).from(1).to(2).messagesMatching(node2Truncation).drop();
+
+            Future<NodeToolResult> repairResult = es.submit(() -> 
cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl"));
+
+            Future<?> truncationFuture = es.submit(() -> {
+                try
+                {
+                    /*
+                    wait for streaming message to sent before truncating, to 
make sure we have a mismatch to make us stream later
+                     */
+                    node2Streaming.gotMessage.await();
+                }
+                catch (InterruptedException e)
+                {
+                    throw new RuntimeException(e);
+                }
+                cluster.coordinator(1).execute("TRUNCATE "+KEYSPACE+".tbl", 
ConsistencyLevel.ALL);
+            });
+
+            node2Truncation.gotMessage.await();
+            // make sure node1 finishes truncation, removing its files
+            cluster.get(1).runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                while (!cfs.getLiveSSTables().isEmpty())
+                    Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
+            });
+
+            /* let repair finish, streaming files from 2 -> 1 */
+            node2Streaming.allowMessage.signalAll();
+
+            /* and the repair should fail: */
+            repairResult.get().asserts().failure();
+
+            /*
+            and let truncation finish on node2
+             */
+            node2Truncation.allowMessage.signalAll();
+            truncationFuture.get();
+
+            /* wait for truncation to remove files on node2 */
+            cluster.get(2).runOnInstance(() -> {
+                ColumnFamilyStore cfs = 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+                while (!cfs.getLiveSSTables().isEmpty())
+                {
+                    System.out.println(cfs.getLiveSSTables());
+                    Uninterruptibles.sleepUninterruptibly(100, 
TimeUnit.MILLISECONDS);
+                }
+            });
+
+            cluster.get(1).nodetoolResult("repair", "-vd", KEYSPACE, 
"tbl").asserts().success().notificationContains("Repair preview completed 
successfully");
+        }
+        finally
+        {
+            es.shutdown();
+        }
+    }
+
+    private static class BlockMessage implements IMessageFilters.Matcher
+    {
+        private final SimpleCondition gotMessage = new SimpleCondition();
+        private final SimpleCondition allowMessage = new SimpleCondition();
+
+        public boolean matches(int from, int to, IMessage message)
+        {
+            gotMessage.signalAll();
+            try
+            {
+                allowMessage.await();
+            }
+            catch (InterruptedException e)
+            {
+                throw new RuntimeException(e);
+            }
+            return false;
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 7c306c0..18e6d43 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -323,7 +323,7 @@ public class PreviewRepairTest extends TestBaseImpl
         }
     }
 
-    private static void insert(ICoordinator coordinator, int start, int count)
+    static void insert(ICoordinator coordinator, int start, int count)
     {
         insert(coordinator, start, count, "tbl");
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to