This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 23512cf  Prevent parent repair sessions leak
23512cf is described below

commit 23512cf3da5e8206d8797841f2238cdd86c13d96
Author: jtgrabowski <[email protected]>
AuthorDate: Wed Feb 24 18:40:22 2021 +0000

    Prevent parent repair sessions leak
    
    patch by Jaroslaw Grabowski and Berenguer Blasi; reviewed by Ekaterina 
Dimitrova and Andrés de la Peña for CASSANDRA-16446
    
    Co-authored-by: jtgrabowski <[email protected]>
    Co-authored-by: Bereng <[email protected]>
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/repair/RepairRunnable.java    | 42 +++++++++++++++---
 .../cassandra/service/ActiveRepairService.java     | 51 ++++++++++++++++++++++
 .../service/ActiveRepairServiceMBean.java          |  9 ++++
 4 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index dbfa272..8be989c 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta5
+ * Prevent parent repair sessions leak (CASSANDRA-16446)
  * Fix timestamp issue in SinglePartitionSliceCommandTest 
testPartitionD…eletionRowDeletionTie (CASSANDRA-16443)
  * Promote protocol V5 out of beta (CASSANDRA-14973)
  * Fix incorrect encoding for strings can be UTF8 (CASSANDRA-16429)
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 5d8e945..793d2f2 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -393,15 +393,29 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
     {
         if (options.isPreview())
         {
-            previewRepair(parentSession, creationTimeMillis, 
neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
+            previewRepair(parentSession,
+                          creationTimeMillis,
+                          neighborsAndRanges.filterCommonRanges(keyspace, 
cfnames),
+                          neighborsAndRanges.participants,
+                          cfnames);
         }
         else if (options.isIncremental())
         {
-            incrementalRepair(parentSession, creationTimeMillis, traceState, 
neighborsAndRanges, cfnames);
+            incrementalRepair(parentSession,
+                              creationTimeMillis,
+                              traceState,
+                              neighborsAndRanges,
+                              neighborsAndRanges.participants,
+                              cfnames);
         }
         else
         {
-            normalRepair(parentSession, creationTimeMillis, traceState, 
neighborsAndRanges.filterCommonRanges(keyspace, cfnames), cfnames);
+            normalRepair(parentSession,
+                         creationTimeMillis,
+                         traceState,
+                         neighborsAndRanges.filterCommonRanges(keyspace, 
cfnames),
+                         neighborsAndRanges.participants,
+                         cfnames);
         }
     }
 
@@ -409,6 +423,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
                               long startTime,
                               TraceState traceState,
                               List<CommonRange> commonRanges,
+                              Set<InetAddressAndPort> preparedEndpoints,
                               String... cfnames)
     {
 
@@ -447,13 +462,22 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
                 return Futures.immediateFuture(null);
             }
         }, MoreExecutors.directExecutor());
-        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, successfulRanges, startTime, traceState, 
hasFailure, executor), MoreExecutors.directExecutor());
+        Futures.addCallback(repairResult,
+                            new RepairCompleteCallback(parentSession,
+                                                       successfulRanges,
+                                                       preparedEndpoints,
+                                                       startTime,
+                                                       traceState,
+                                                       hasFailure,
+                                                       executor),
+                            MoreExecutors.directExecutor());
     }
 
     private void incrementalRepair(UUID parentSession,
                                    long startTime,
                                    TraceState traceState,
                                    NeighborsAndRanges neighborsAndRanges,
+                                   Set<InetAddressAndPort> preparedEndpoints,
                                    String... cfnames)
     {
         // the local node also needs to be included in the set of 
participants, since coordinator sessions aren't persisted
@@ -474,12 +498,15 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
         {
             ranges.addAll(range);
         }
-        Futures.addCallback(repairResult, new 
RepairCompleteCallback(parentSession, ranges, startTime, traceState, 
hasFailure, executor), MoreExecutors.directExecutor());
+        Futures.addCallback(repairResult,
+                            new RepairCompleteCallback(parentSession, ranges, 
preparedEndpoints, startTime, traceState, hasFailure, executor),
+                            MoreExecutors.directExecutor());
     }
 
     private void previewRepair(UUID parentSession,
                                long startTime,
                                List<CommonRange> commonRanges,
+                               Set<InetAddressAndPort> preparedEndpoints,
                                String... cfnames)
     {
 
@@ -521,6 +548,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
                     notification(message);
 
                     success("Repair preview completed successfully");
+                    ActiveRepairService.instance.cleanUp(parentSession, 
preparedEndpoints);
                 }
                 catch (Throwable t)
                 {
@@ -669,6 +697,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
     {
         final UUID parentSession;
         final Collection<Range<Token>> successfulRanges;
+        final Set<InetAddressAndPort> preparedEndpoints;
         final long startTime;
         final TraceState traceState;
         final AtomicBoolean hasFailure;
@@ -676,6 +705,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
 
         public RepairCompleteCallback(UUID parentSession,
                                       Collection<Range<Token>> 
successfulRanges,
+                                      Set<InetAddressAndPort> 
preparedEndpoints,
                                       long startTime,
                                       TraceState traceState,
                                       AtomicBoolean hasFailure,
@@ -683,6 +713,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
         {
             this.parentSession = parentSession;
             this.successfulRanges = successfulRanges;
+            this.preparedEndpoints = preparedEndpoints;
             this.startTime = startTime;
             this.traceState = traceState;
             this.hasFailure = hasFailure;
@@ -699,6 +730,7 @@ public class RepairRunnable implements Runnable, 
ProgressEventNotifier
             else
             {
                 success("Repair completed successfully");
+                ActiveRepairService.instance.cleanUp(parentSession, 
preparedEndpoints);
             }
             executor.shutdownNow();
         }
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java 
b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 2cdc794..58587be 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -64,6 +64,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.metrics.RepairMetrics;
 import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.CommonRange;
@@ -77,6 +78,7 @@ import 
org.apache.cassandra.repair.consistent.admin.PendingStats;
 import org.apache.cassandra.repair.consistent.admin.RepairStats;
 import org.apache.cassandra.repair.consistent.RepairedState;
 import org.apache.cassandra.repair.consistent.admin.SchemaArgsParser;
+import org.apache.cassandra.repair.messages.CleanupMessage;
 import org.apache.cassandra.repair.messages.PrepareMessage;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.RepairOption;
@@ -304,6 +306,12 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         return stats;
     }
 
+    @Override
+    public int parentRepairSessionsCount()
+    {
+        return parentRepairSessions.size();
+    }
+
     /**
      * Requests repairs for the given keyspace and column families.
      *
@@ -597,6 +605,49 @@ public class ActiveRepairService implements 
IEndpointStateChangeSubscriber, IFai
         return parentRepairSession;
     }
 
+    /**
+     * Send Verb.CLEANUP_MSG to the given endpoints. This results in removing 
parent session object from the
+     * endpoint's cache.
+     * This method does not throw an exception in case of a messaging failure.
+     */
+    public void cleanUp(UUID parentRepairSession, Set<InetAddressAndPort> 
endpoints)
+    {
+        for (InetAddressAndPort endpoint : endpoints)
+        {
+            try
+            {
+                if (FailureDetector.instance.isAlive(endpoint))
+                {
+                    CleanupMessage message = new 
CleanupMessage(parentRepairSession);
+                    Message<CleanupMessage> msg = 
Message.out(Verb.CLEANUP_MSG, message);
+
+                    RequestCallback loggingCallback = new RequestCallback()
+                    {
+                        @Override
+                        public void onResponse(Message msg)
+                        {
+                            logger.trace("Successfully cleaned up {} parent 
repair session on {}.", parentRepairSession, endpoint);
+                        }
+
+                        @Override
+                        public void onFailure(InetAddressAndPort from, 
RequestFailureReason failureReason)
+                        {
+                            logger.debug("Failed to clean up parent repair 
session {} on {}. The uncleaned sessions will " +
+                                    "be removed on a node restart. This should 
not be a problem unless you see thousands " +
+                                    "of messages like this.", 
parentRepairSession, endpoint);
+                        }
+                    };
+
+                    MessagingService.instance().sendWithCallback(msg, 
endpoint, loggingCallback);
+                }
+            }
+            catch (Exception exc)
+            {
+                logger.warn("Failed to send a clean up message to {}", 
endpoint, exc);
+            }
+        }
+    }
+
     private void failRepair(UUID parentRepairSession, String errorMsg) {
         removeParentRepairSession(parentRepairSession);
         throw new RuntimeException(errorMsg);
diff --git 
a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java 
b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
index 8cffecc..b68cb6f 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairServiceMBean.java
@@ -41,4 +41,13 @@ public interface ActiveRepairServiceMBean
     public List<CompositeData> getRepairStats(List<String> schemaArgs, String 
rangeString);
     public List<CompositeData> getPendingStats(List<String> schemaArgs, String 
rangeString);
     public List<CompositeData> cleanupPending(List<String> schemaArgs, String 
rangeString, boolean force);
+
+    /**
+     * Each ongoing repair (incremental and non-incremental) is represented by 
a
+     * {@link ActiveRepairService.ParentRepairSession} entry in the {@link 
ActiveRepairService} cache.
+     * Returns the current number of ongoing repairs (the current number of 
cached entries).
+     *
+     * @return current size of the internal cache holding {@link 
ActiveRepairService.ParentRepairSession} instances
+     */
+    int parentRepairSessionsCount();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to