This is an automated email from the ASF dual-hosted git repository.
adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 23512cf Prevent parent repair sessions leak
23512cf is described below
commit 23512cf3da5e8206d8797841f2238cdd86c13d96
Author: jtgrabowski <[email protected]>
AuthorDate: Wed Feb 24 18:40:22 2021 +0000
Prevent parent repair sessions leak
patch by Jaroslaw Grabowski and Berenguer Blasi; reviewed by Ekaterina
Dimitrova and Andrés de la Peña for CASSANDRA-16446
Co-authored-by: jtgrabowski <[email protected]>
Co-authored-by: Bereng <[email protected]>
---
CHANGES.txt | 1 +
.../apache/cassandra/repair/RepairRunnable.java | 42 +++++++++++++++---
.../cassandra/service/ActiveRepairService.java | 51 ++++++++++++++++++++++
.../service/ActiveRepairServiceMBean.java | 9 ++++
4 files changed, 98 insertions(+), 5 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index dbfa272..8be989c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.0-beta5
+ * Prevent parent repair sessions leak (CASSANDRA-16446)
* Fix timestamp issue in SinglePartitionSliceCommandTest
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
* Promote protocol V5 out of beta (CASSANDRA-14973)
* Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 5d8e945..793d2f2 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -393,15 +393,29 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
{
if (options.isPreview())
{
- previewRepair(parentSession, creationTimeMillis,
neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
+ previewRepair(parentSession,
+ creationTimeMillis,
+ neighborsAndRanges.filterCommonRanges(keyspace,
cfnames),
+ neighborsAndRanges.participants,
+ cfnames);
}
else if (options.isIncremental())
{
- incrementalRepair(parentSession, creationTimeMillis, traceState,
neighborsAndRanges, cfnames);
+ incrementalRepair(parentSession,
+ creationTimeMillis,
+ traceState,
+ neighborsAndRanges,
+ neighborsAndRanges.participants,
+ cfnames);
}
else
{
- normalRepair(parentSession, creationTimeMillis, traceState,
neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
+ normalRepair(parentSession,
+ creationTimeMillis,
+ traceState,
+ neighborsAndRanges.filterCommonRanges(keyspace,
cfnames),
+ neighborsAndRanges.participants,
+ cfnames);
}
}
@@ -409,6 +423,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
long startTime,
TraceState traceState,
List<CommonRange> commonRanges,
+ Set<InetAddressAndPort> preparedEndpoints,
String... cfnames)
{
@@ -447,13 +462,22 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
return Futures.immediateFuture(null);
}
}, MoreExecutors.directExecutor());
- Futures.addCallback(repairResult, new
RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState,
hasFailure, executor), MoreExecutors.directExecutor());
+ Futures.addCallback(repairResult,
+ new RepairCompleteCallback(parentSession,
+ successfulRanges,
+ preparedEndpoints,
+ startTime,
+ traceState,
+ hasFailure,
+ executor),
+ MoreExecutors.directExecutor());
}
private void incrementalRepair(UUID parentSession,
long startTime,
TraceState traceState,
NeighborsAndRanges neighborsAndRanges,
+ Set<InetAddressAndPort> preparedEndpoints,
String... cfnames)
{
// the local node also needs to be included in the set of
participants, since coordinator sessions aren't persisted
@@ -474,12 +498,15 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
{
ranges.addAll(range);
}
- Futures.addCallback(repairResult, new
RepairCompleteCallback(parentSession, ranges, startTime, traceState,
hasFailure, executor), MoreExecutors.directExecutor());
+ Futures.addCallback(repairResult,
+ new RepairCompleteCallback(parentSession, ranges,
preparedEndpoints, startTime, traceState, hasFailure, executor),
+ MoreExecutors.directExecutor());
}
private void previewRepair(UUID parentSession,
long startTime,
List<CommonRange> commonRanges,
+ Set<InetAddressAndPort> preparedEndpoints,
String... cfnames)
{
@@ -521,6 +548,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
notification(message);
success("Repair preview completed successfully");
+ ActiveRepairService.instance.cleanUp(parentSession,
preparedEndpoints);
}
catch (Throwable t)
{
@@ -669,6 +697,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
{
final UUID parentSession;
final Collection<Range<Token>> successfulRanges;
+ final Set<InetAddressAndPort> preparedEndpoints;
final long startTime;
final TraceState traceState;
final AtomicBoolean hasFailure;
@@ -676,6 +705,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
public RepairCompleteCallback(UUID parentSession,
Collection<Range<Token>>
successfulRanges,
+ Set<InetAddressAndPort>
preparedEndpoints,
long startTime,
TraceState traceState,
AtomicBoolean hasFailure,
@@ -683,6 +713,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
{
this.parentSession = parentSession;
this.successfulRanges = successfulRanges;
+ this.preparedEndpoints = preparedEndpoints;
this.startTime = startTime;
this.traceState = traceState;
this.hasFailure = hasFailure;
@@ -699,6 +730,7 @@ public class RepairRunnable implements Runnable,
ProgressEventNotifier
else
{
success("Repair completed successfully");
+ ActiveRepairService.instance.cleanUp(parentSession,
preparedEndpoints);
}
executor.shutdownNow();
}
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2cdc794..58587be 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.metrics.RepairMetrics;
import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.CommonRange;
@@ -77,6 +78,7 @@ import
org.apache.cassandra.repair.consistent.admin.PendingStats;
import org.apache.cassandra.repair.consistent.admin.RepairStats;
import org.apache.cassandra.repair.consistent.RepairedState;
import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser;
+import org.apache.cassandra.repair.messages.CleanupMessage;
import org.apache.cassandra.repair.messages.PrepareMessage;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.RepairOption;
@@ -304,6 +306,12 @@ public class ActiveRepairService implements
IEndpointStateChangeSubscriber, IFai
return stats;
}
+ @Override
+ public int parentRepairSessionsCount()
+ {
+ return parentRepairSessions.size();
+ }
+
/**
* Requests repairs for the given keyspace and column families.
*
@@ -597,6 +605,49 @@ public class ActiveRepairService implements
IEndpointStateChangeSubscriber, IFai
return parentRepairSession;
}
+ /**
+ * Send Verb.CLEANUP_MSG to the given endpoints. This results in removing
parent session object from the
+ * endpoint's cache.
+ * This method does not throw an exception in case of a messaging failure.
+ */
+ public void cleanUp(UUID parentRepairSession, Set<InetAddressAndPort>
endpoints)
+ {
+ for (InetAddressAndPort endpoint : endpoints)
+ {
+ try
+ {
+ if (FailureDetector.instance.isAlive(endpoint))
+ {
+ CleanupMessage message = new
CleanupMessage(parentRepairSession);
+ Message<CleanupMessage> msg =
Message.out(Verb.CLEANUP_MSG, message);
+
+ RequestCallback loggingCallback = new RequestCallback()
+ {
+ @Override
+ public void onResponse(Message msg)
+ {
+ logger.trace("Successfully cleaned up {} parent
repair session on {}.", parentRepairSession, endpoint);
+ }
+
+ @Override
+ public void onFailure(InetAddressAndPort from,
RequestFailureReason failureReason)
+ {
+ logger.debug("Failed to clean up parent repair
session {} on {}. The uncleaned sessions will " +
+ "be removed on a node restart. This should
not be a problem unless you see thousands " +
+ "of messages like this.",
parentRepairSession, endpoint);
+ }
+ };
+
+ MessagingService.instance().sendWithCallback(msg,
endpoint, loggingCallback);
+ }
+ }
+ catch (Exception exc)
+ {
+ logger.warn("Failed to send a clean up message to {}",
endpoint, exc);
+ }
+ }
+ }
+
private void failRepair(UUID parentRepairSession, String errorMsg) {
removeParentRepairSession(parentRepairSession);
throw new RuntimeException(errorMsg);
diff --git
a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 8cffecc..b68cb6f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -41,4 +41,13 @@ public interface ActiveRepairServiceMBean
public List<CompositeData> getRepairStats(List<String> schemaArgs, String
rangeString);
public List<CompositeData> getPendingStats(List<String> schemaArgs, String
rangeString);
public List<CompositeData> cleanupPending(List<String> schemaArgs, String
rangeString, boolean force);
+
+ /**
+ * Each ongoing repair (incremental and non-incremental) is represented by
a
+ * {@link ActiveRepairService.ParentRepairSession} entry in the {@link
ActiveRepairService} cache.
+ * Returns the current number of ongoing repairs (the current number of
cached entries).
+ *
+ * @return current size of the internal cache holding {@link
ActiveRepairService.ParentRepairSession} instances
+ */
+ int parentRepairSessionsCount();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]