Repository: cassandra
Updated Branches:
  refs/heads/trunk 7df240e74 -> 3234c0704


Send IR coordinator messages synchronously

Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13673


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3234c070
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3234c070
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3234c070

Branch: refs/heads/trunk
Commit: 3234c0704a4fef08dedc4ff78f4ded3b9226fe80
Parents: 7df240e
Author: Blake Eggleston <bdeggles...@gmail.com>
Authored: Wed Jul 5 13:20:32 2017 -0700
Committer: Blake Eggleston <bdeggles...@gmail.com>
Committed: Thu Jul 6 10:31:37 2017 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/repair/RepairRunnable.java |  3 +-
 .../repair/consistent/ConsistentSession.java    |  5 ++-
 .../repair/consistent/CoordinatorSession.java   | 35 ++++++++------------
 .../consistent/CoordinatorSessionTest.java      | 22 +++++-------
 .../consistent/CoordinatorSessionsTest.java     |  3 +-
 6 files changed, 27 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 22045e8..6cd8bc5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Send IR coordinator messages synchronously (CASSANDRA-13673)
  * Flush system.repair table before IR finalize promise (CASSANDRA-13660)
  * Fix column filter creation for wildcard queries (CASSANDRA-13650)
  * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool 
setbatchlogreplaythrottle' (CASSANDRA-13614)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java 
b/src/java/org/apache/cassandra/repair/RepairRunnable.java
index 29347a4..3f761ee 100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@ -329,8 +329,7 @@ public class RepairRunnable extends WrappedRunnable 
implements ProgressEventNoti
         CoordinatorSession coordinatorSession = 
ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession,
 allParticipants);
         ListeningExecutorService executor = createExecutor();
         AtomicBoolean hasFailure = new AtomicBoolean(false);
-        ListenableFuture repairResult = coordinatorSession.execute(executor,
-                                                                   () -> 
submitRepairSessions(parentSession, true, executor, commonRanges, cfnames),
+        ListenableFuture repairResult = coordinatorSession.execute(() -> 
submitRepairSessions(parentSession, true, executor, commonRanges, cfnames),
                                                                    hasFailure);
         Collection<Range<Token>> ranges = new HashSet<>();
         for (Collection<Range<Token>> range : 
Iterables.transform(commonRanges, cr -> cr.right))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java 
b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
index af0a0dd..803a1f8 100644
--- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -96,8 +95,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin;
  *  conflicts with in progress compactions. The sstables will be marked 
repaired as part of the normal compaction process.
  *  <p/>
  *
- *  On the coordinator side, see {@link 
CoordinatorSession#finalizePropose(Executor)}, {@link 
CoordinatorSession#handleFinalizePromise(InetAddress, boolean)},
- *  & {@link CoordinatorSession#finalizeCommit(Executor)}
+ *  On the coordinator side, see {@link CoordinatorSession#finalizePropose()}, 
{@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)},
+ *  & {@link CoordinatorSession#finalizeCommit()}
  *  <p/>
  *
  *  On the local session side, see {@link 
LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java 
b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
index bac749e..830ed2c 100644
--- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
+++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java
@@ -22,7 +22,6 @@ import java.net.InetAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
@@ -34,7 +33,6 @@ import com.google.common.util.concurrent.AsyncFunction;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 import org.apache.commons.lang3.time.DurationFormatUtils;
 import org.slf4j.Logger;
@@ -139,7 +137,7 @@ public class CoordinatorSession extends ConsistentSession
         MessagingService.instance().sendOneWay(messageOut, destination);
     }
 
-    public ListenableFuture<Boolean> prepare(Executor executor)
+    public ListenableFuture<Boolean> prepare()
     {
         Preconditions.checkArgument(allStates(State.PREPARING));
 
@@ -147,7 +145,7 @@ public class CoordinatorSession extends ConsistentSession
         PrepareConsistentRequest message = new 
PrepareConsistentRequest(sessionID, coordinator, participants);
         for (final InetAddress participant : participants)
         {
-            executor.execute(() -> sendMessage(participant, message));
+            sendMessage(participant, message);
         }
         return prepareFuture;
     }
@@ -181,14 +179,14 @@ public class CoordinatorSession extends ConsistentSession
         setAll(State.REPAIRING);
     }
 
-    public synchronized ListenableFuture<Boolean> finalizePropose(Executor 
executor)
+    public synchronized ListenableFuture<Boolean> finalizePropose()
     {
         Preconditions.checkArgument(allStates(State.REPAIRING));
         logger.debug("Proposing finalization of repair session {}", sessionID);
         FinalizePropose message = new FinalizePropose(sessionID);
         for (final InetAddress participant : participants)
         {
-            executor.execute(() -> sendMessage(participant, message));
+            sendMessage(participant, message);
         }
         return finalizeProposeFuture;
     }
@@ -217,25 +215,20 @@ public class CoordinatorSession extends ConsistentSession
         }
     }
 
-    public synchronized void finalizeCommit(Executor executor)
+    public synchronized void finalizeCommit()
     {
         Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED));
         logger.debug("Committing finalization of repair session {}", 
sessionID);
         FinalizeCommit message = new FinalizeCommit(sessionID);
         for (final InetAddress participant : participants)
         {
-            executor.execute(() -> sendMessage(participant, message));
+            sendMessage(participant, message);
         }
         setAll(State.FINALIZED);
         logger.info("Incremental repair session {} completed", sessionID);
     }
 
-    public void fail()
-    {
-        fail(MoreExecutors.directExecutor());
-    }
-
-    public synchronized void fail(Executor executor)
+    public synchronized void fail()
     {
         logger.info("Incremental repair session {} failed", sessionID);
         FailSession message = new FailSession(sessionID);
@@ -243,7 +236,7 @@ public class CoordinatorSession extends ConsistentSession
         {
             if (participantStates.get(participant) != State.FAILED)
             {
-                executor.execute(() -> sendMessage(participant, message));
+                sendMessage(participant, message);
             }
         }
         setAll(State.FAILED);
@@ -262,12 +255,12 @@ public class CoordinatorSession extends ConsistentSession
     /**
      * Runs the asynchronous consistent repair session. Actual repair sessions 
are scheduled via a submitter to make unit testing easier
      */
-    public ListenableFuture execute(Executor executor, 
Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, 
AtomicBoolean hasFailure)
+    public ListenableFuture 
execute(Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, 
AtomicBoolean hasFailure)
     {
         logger.info("Beginning coordination of incremental repair session {}", 
sessionID);
 
         sessionStart = System.currentTimeMillis();
-        ListenableFuture<Boolean> prepareResult = prepare(executor);
+        ListenableFuture<Boolean> prepareResult = prepare();
 
         // run repair sessions normally
         ListenableFuture<List<RepairSessionResult>> repairSessionResults = 
Futures.transform(prepareResult, new AsyncFunction<Boolean, 
List<RepairSessionResult>>()
@@ -309,7 +302,7 @@ public class CoordinatorSession extends ConsistentSession
                 }
                 else
                 {
-                    return finalizePropose(executor);
+                    return finalizePropose();
                 }
             }
         });
@@ -325,7 +318,7 @@ public class CoordinatorSession extends ConsistentSession
                     {
                         logger.debug("Incremental repair {} finalization phase 
completed in {}", sessionID, formatDuration(finalizeStart, 
System.currentTimeMillis()));
                     }
-                    finalizeCommit(executor);
+                    finalizeCommit();
                     if (logger.isDebugEnabled())
                     {
                         logger.debug("Incremental repair {} phase completed in 
{}", sessionID, formatDuration(sessionStart, System.currentTimeMillis()));
@@ -334,7 +327,7 @@ public class CoordinatorSession extends ConsistentSession
                 else
                 {
                     hasFailure.set(true);
-                    fail(executor);
+                    fail();
                 }
             }
 
@@ -345,7 +338,7 @@ public class CoordinatorSession extends ConsistentSession
                     logger.debug("Incremental repair {} phase failed in {}", 
sessionID, formatDuration(sessionStart, System.currentTimeMillis()));
                 }
                 hasFailure.set(true);
-                fail(executor);
+                fail();
             }
         });
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
index 3c27b5e..fb312c3 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java
@@ -24,14 +24,12 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
 
 import org.junit.Assert;
@@ -118,26 +116,26 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
 
         Runnable onFinalizeCommit = null;
         boolean finalizeCommitCalled = false;
-        public synchronized void finalizeCommit(Executor executor)
+        public synchronized void finalizeCommit()
         {
             finalizeCommitCalled = true;
             if (onFinalizeCommit != null)
             {
                 onFinalizeCommit.run();
             }
-            super.finalizeCommit(executor);
+            super.finalizeCommit();
         }
 
         Runnable onFail = null;
         boolean failCalled = false;
-        public synchronized void fail(Executor executor)
+        public synchronized void fail()
         {
             failCalled = true;
             if (onFail != null)
             {
                 onFail.run();
             }
-            super.fail(executor);
+            super.fail();
         }
     }
 
@@ -209,7 +207,6 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
     public void successCase()
     {
         InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
-        Executor executor = MoreExecutors.directExecutor();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
         SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
         Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
@@ -222,7 +219,7 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        ListenableFuture sessionResult = coordinator.execute(sessionSupplier, 
hasFailures);
 
         for (InetAddress participant : PARTICIPANTS)
         {
@@ -294,7 +291,6 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
     public void failedRepairs()
     {
         InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
-        Executor executor = MoreExecutors.directExecutor();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
         SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
         Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
@@ -307,7 +303,7 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        ListenableFuture sessionResult = coordinator.execute(sessionSupplier, 
hasFailures);
         for (InetAddress participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
@@ -357,7 +353,6 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
     public void failedPrepare()
     {
         InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
-        Executor executor = MoreExecutors.directExecutor();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
         SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
         Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
@@ -370,7 +365,7 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        ListenableFuture sessionResult = coordinator.execute(sessionSupplier, 
hasFailures);
         for (InetAddress participant : PARTICIPANTS)
         {
             PrepareConsistentRequest expected = new 
PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new 
HashSet<>(PARTICIPANTS));
@@ -413,7 +408,6 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
     public void failedPropose()
     {
         InstrumentedCoordinatorSession coordinator = 
createInstrumentedSession();
-        Executor executor = MoreExecutors.directExecutor();
         AtomicBoolean repairSubmitted = new AtomicBoolean(false);
         SettableFuture<List<RepairSessionResult>> repairFuture = 
SettableFuture.create();
         Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier 
= () ->
@@ -426,7 +420,7 @@ public class CoordinatorSessionTest extends 
AbstractRepairTest
         AtomicBoolean hasFailures = new AtomicBoolean(false);
         Assert.assertFalse(repairSubmitted.get());
         Assert.assertTrue(coordinator.sentMessages.isEmpty());
-        ListenableFuture sessionResult = coordinator.execute(executor, 
sessionSupplier, hasFailures);
+        ListenableFuture sessionResult = coordinator.execute(sessionSupplier, 
hasFailures);
 
         for (InetAddress participant : PARTICIPANTS)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
----------------------------------------------------------------------
diff --git 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
index b7adb27..b40e185 100644
--- 
a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
+++ 
b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java
@@ -21,7 +21,6 @@ package org.apache.cassandra.repair.consistent;
 import java.net.InetAddress;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executor;
 
 import com.google.common.collect.Sets;
 import org.junit.Assert;
@@ -76,7 +75,7 @@ public class CoordinatorSessionsTest extends 
AbstractRepairTest
         }
 
         int failCalls = 0;
-        public synchronized void fail(Executor executor)
+        public synchronized void fail()
         {
             failCalls++;
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to