This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new d4eba9f Abort repairs when getting a truncation request
d4eba9f is described below
commit d4eba9faa1b57fed205813a639fe53bbbdc06ef1
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Jun 11 09:37:03 2020 +0200
Abort repairs when getting a truncation request
Patch by marcuse; reviewed by Caleb Rackliffe and David Capwell for
CASSANDRA-15854
---
CHANGES.txt | 1 +
.../org/apache/cassandra/db/ColumnFamilyStore.java | 23 ++-
.../org/apache/cassandra/repair/RepairJob.java | 3 +
.../cassandra/service/ActiveRepairService.java | 35 ++---
.../distributed/test/IncRepairTruncationTest.java | 156 +++++++++++++++++++++
.../distributed/test/PreviewRepairTest.java | 2 +-
6 files changed, 199 insertions(+), 21 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index ec95e20..0215a71 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta3
+ * Abort repairs when getting a truncation request (CASSANDRA-15854)
* Remove bad assert when getting active compactions for an sstable
(CASSANDRA-15457)
* Avoid failing compactions with very large partitions (CASSANDRA-15164)
* Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index cb6c86b..1fc3eba 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -79,6 +79,7 @@ import
org.apache.cassandra.repair.consistent.admin.CleanupSummary;
import org.apache.cassandra.repair.consistent.admin.PendingStat;
import org.apache.cassandra.schema.*;
import org.apache.cassandra.schema.CompactionParams.TombstoneOption;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.streaming.TableStreamManager;
@@ -90,8 +91,6 @@ import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
-import static org.apache.cassandra.utils.ExecutorUtils.awaitTermination;
-import static org.apache.cassandra.utils.ExecutorUtils.shutdown;
import static org.apache.cassandra.utils.Throwables.maybeFail;
import static org.apache.cassandra.utils.Throwables.merge;
@@ -2194,7 +2193,12 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean
{
public void run()
{
- logger.debug("Discarding sstable data for truncated CF +
indexes");
+ logger.info("Truncating {}.{} with truncatedAt={}",
keyspace.getName(), getTableName(), truncatedAt);
+ // since truncation can happen at different times on different
nodes, we need to make sure
+ // that any repairs are aborted, otherwise we might clear the
data on one node and then
+ // stream in data that is actually supposed to have been
deleted
+ ActiveRepairService.instance.abort((prs) ->
prs.getTableIds().contains(metadata.id),
+ "Stopping parent sessions
{} due to truncation of tableId="+metadata.id);
data.notifyTruncated(truncatedAt);
if (DatabaseDescriptor.isAutoSnapshot())
@@ -2208,6 +2212,7 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean
SystemKeyspace.saveTruncationRecord(ColumnFamilyStore.this,
truncatedAt, replayAfter);
logger.trace("cleaning out row cache");
invalidateCaches();
+
}
};
@@ -2625,15 +2630,25 @@ public class ColumnFamilyStore implements
ColumnFamilyStoreMBean
assert data.getCompacting().isEmpty() : data.getCompacting();
List<SSTableReader> truncatedSSTables = new ArrayList<>();
-
+ int keptSSTables = 0;
for (SSTableReader sstable : getSSTables(SSTableSet.LIVE))
{
if (!sstable.newSince(truncatedAt))
+ {
truncatedSSTables.add(sstable);
+ }
+ else
+ {
+ keptSSTables++;
+ logger.info("Truncation is keeping {} maxDataAge={}
truncatedAt={}", sstable, sstable.maxDataAge, truncatedAt);
+ }
}
if (!truncatedSSTables.isEmpty())
+ {
+ logger.info("Truncation is dropping {} sstables and keeping {} due
to sstable.maxDataAge > truncatedAt", truncatedSSTables.size(), keptSSTables);
markObsolete(truncatedSSTables, OperationType.UNKNOWN);
+ }
}
public double getDroppableTombstoneRatio()
diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java
b/src/java/org/apache/cassandra/repair/RepairJob.java
index 16eb325..341b0fd 100644
--- a/src/java/org/apache/cassandra/repair/RepairJob.java
+++ b/src/java/org/apache/cassandra/repair/RepairJob.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.repair.asymmetric.HostDifferences;
import org.apache.cassandra.repair.asymmetric.PreferedNodeFilter;
import org.apache.cassandra.repair.asymmetric.ReduceHelper;
import org.apache.cassandra.locator.InetAddressAndPort;
+import org.apache.cassandra.service.ActiveRepairService;
import org.apache.cassandra.streaming.PreviewKind;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
@@ -274,6 +275,8 @@ public class RepairJob extends AbstractFuture<RepairResult>
implements Runnable
@VisibleForTesting
ListenableFuture<List<SyncStat>> executeTasks(List<SyncTask> syncTasks)
{
+ // this throws if the parent session has failed
+
ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId);
for (SyncTask task : syncTasks)
{
if (!task.isLocal())
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 3b13907..f7b0686 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -23,6 +23,7 @@ import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.openmbean.CompositeData;
+import java.util.function.Predicate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -780,22 +781,7 @@ public class ActiveRepairService implements
IEndpointStateChangeSubscriber, IFai
if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() ||
parentRepairSessions.isEmpty())
return;
- Set<UUID> toRemove = new HashSet<>();
-
- for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry :
parentRepairSessions.entrySet())
- {
- if (repairSessionEntry.getValue().coordinator.equals(ep))
- {
- toRemove.add(repairSessionEntry.getKey());
- }
- }
-
- if (!toRemove.isEmpty())
- {
- logger.debug("Removing {} in parent repair sessions", toRemove);
- for (UUID id : toRemove)
- removeParentRepairSession(id);
- }
+ abort((prs) -> prs.coordinator.equals(ep), "Removing {} in parent
repair sessions");
}
public int getRepairPendingCompactionRejectThreshold()
@@ -808,4 +794,21 @@ public class ActiveRepairService implements
IEndpointStateChangeSubscriber, IFai
DatabaseDescriptor.setRepairPendingCompactionRejectThreshold(value);
}
+ /**
+ * Remove any parent repair sessions matching predicate
+ */
+ public void abort(Predicate<ParentRepairSession> predicate, String message)
+ {
+ Set<UUID> parentSessionsToRemove = new HashSet<>();
+ for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry :
parentRepairSessions.entrySet())
+ {
+ if (predicate.test(repairSessionEntry.getValue()))
+ parentSessionsToRemove.add(repairSessionEntry.getKey());
+ }
+ if (!parentSessionsToRemove.isEmpty())
+ {
+ logger.info(message, parentSessionsToRemove);
+ parentSessionsToRemove.forEach(this::removeParentRepairSession);
+ }
+ }
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
new file mode 100644
index 0000000..bd47906
--- /dev/null
+++
b/test/distributed/org/apache/cassandra/distributed/test/IncRepairTruncationTest.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.Test;
+
+import org.apache.cassandra.db.ColumnFamilyStore;
+import org.apache.cassandra.db.Keyspace;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.IMessage;
+import org.apache.cassandra.distributed.api.IMessageFilters;
+import org.apache.cassandra.distributed.api.NodeToolResult;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.utils.concurrent.SimpleCondition;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.apache.cassandra.distributed.test.PreviewRepairTest.insert;
+
+public class IncRepairTruncationTest extends TestBaseImpl
+{
+ @Test
+ public void testTruncateDuringIncRepair() throws IOException,
InterruptedException, ExecutionException
+ {
+ ExecutorService es = Executors.newFixedThreadPool(3);
+ try(Cluster cluster = init(Cluster.build(2)
+ .withConfig(config ->
config.set("disable_incremental_repair", false)
+
.with(GOSSIP)
+
.with(NETWORK))
+ .start()))
+ {
+ cluster.schemaChange("create table " + KEYSPACE + ".tbl (id int
primary key, t int)");
+
+ insert(cluster.coordinator(1), 0, 100);
+ cluster.forEach((node) -> node.flush(KEYSPACE));
+ // mark everything repaired
+ cluster.get(1).nodetoolResult("repair", KEYSPACE,
"tbl").asserts().success();
+
+ /*
+ make sure we are out-of-sync to make node2 stream data to node1:
+ */
+ cluster.get(2).executeInternal("insert into "+KEYSPACE+".tbl (id,
t) values (5, 5)");
+ cluster.get(2).flush(KEYSPACE);
+ /*
+ start repair:
+ block streaming from 2 -> 1 until truncation below has executed
+ */
+ BlockMessage node2Streaming = new BlockMessage();
+
cluster.filters().inbound().verbs(Verb.VALIDATION_RSP.id).from(2).to(1).messagesMatching(node2Streaming).drop();
+
+ /*
+ block truncation on node2:
+ */
+ BlockMessage node2Truncation = new BlockMessage();
+
cluster.filters().inbound().verbs(Verb.TRUNCATE_REQ.id).from(1).to(2).messagesMatching(node2Truncation).drop();
+
+ Future<NodeToolResult> repairResult = es.submit(() ->
cluster.get(1).nodetoolResult("repair", KEYSPACE, "tbl"));
+
+ Future<?> truncationFuture = es.submit(() -> {
+ try
+ {
+ /*
+ wait for streaming message to sent before truncating, to
make sure we have a mismatch to make us stream later
+ */
+ node2Streaming.gotMessage.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ cluster.coordinator(1).execute("TRUNCATE "+KEYSPACE+".tbl",
ConsistencyLevel.ALL);
+ });
+
+ node2Truncation.gotMessage.await();
+ // make sure node1 finishes truncation, removing its files
+ cluster.get(1).runOnInstance(() -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ while (!cfs.getLiveSSTables().isEmpty())
+ Uninterruptibles.sleepUninterruptibly(100,
TimeUnit.MILLISECONDS);
+ });
+
+ /* let repair finish, streaming files from 2 -> 1 */
+ node2Streaming.allowMessage.signalAll();
+
+ /* and the repair should fail: */
+ repairResult.get().asserts().failure();
+
+ /*
+ and let truncation finish on node2
+ */
+ node2Truncation.allowMessage.signalAll();
+ truncationFuture.get();
+
+ /* wait for truncation to remove files on node2 */
+ cluster.get(2).runOnInstance(() -> {
+ ColumnFamilyStore cfs =
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl");
+ while (!cfs.getLiveSSTables().isEmpty())
+ {
+ System.out.println(cfs.getLiveSSTables());
+ Uninterruptibles.sleepUninterruptibly(100,
TimeUnit.MILLISECONDS);
+ }
+ });
+
+ cluster.get(1).nodetoolResult("repair", "-vd", KEYSPACE,
"tbl").asserts().success().notificationContains("Repair preview completed
successfully");
+ }
+ finally
+ {
+ es.shutdown();
+ }
+ }
+
+ private static class BlockMessage implements IMessageFilters.Matcher
+ {
+ private final SimpleCondition gotMessage = new SimpleCondition();
+ private final SimpleCondition allowMessage = new SimpleCondition();
+
+ public boolean matches(int from, int to, IMessage message)
+ {
+ gotMessage.signalAll();
+ try
+ {
+ allowMessage.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ return false;
+ }
+ }
+}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
index 7c306c0..18e6d43 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/PreviewRepairTest.java
@@ -323,7 +323,7 @@ public class PreviewRepairTest extends TestBaseImpl
}
}
- private static void insert(ICoordinator coordinator, int start, int count)
+ static void insert(ICoordinator coordinator, int start, int count)
{
insert(coordinator, start, count, "tbl");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]