Repository: cassandra Updated Branches: refs/heads/trunk 7df240e74 -> 3234c0704
Send IR coordinator messages synchronously Patch by Blake Eggleston; Reviewed by Marcus Eriksson for CASSANDRA-13673 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3234c070 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3234c070 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3234c070 Branch: refs/heads/trunk Commit: 3234c0704a4fef08dedc4ff78f4ded3b9226fe80 Parents: 7df240e Author: Blake Eggleston <bdeggles...@gmail.com> Authored: Wed Jul 5 13:20:32 2017 -0700 Committer: Blake Eggleston <bdeggles...@gmail.com> Committed: Thu Jul 6 10:31:37 2017 -0700 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/RepairRunnable.java | 3 +- .../repair/consistent/ConsistentSession.java | 5 ++- .../repair/consistent/CoordinatorSession.java | 35 ++++++++------------ .../consistent/CoordinatorSessionTest.java | 22 +++++------- .../consistent/CoordinatorSessionsTest.java | 3 +- 6 files changed, 27 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 22045e8..6cd8bc5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Send IR coordinator messages synchronously (CASSANDRA-13673) * Flush system.repair table before IR finalize promise (CASSANDRA-13660) * Fix column filter creation for wildcard queries (CASSANDRA-13650) * Add 'nodetool getbatchlogreplaythrottle' and 'nodetool setbatchlogreplaythrottle' (CASSANDRA-13614) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairRunnable.java b/src/java/org/apache/cassandra/repair/RepairRunnable.java index 29347a4..3f761ee 100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@ -329,8 +329,7 @@ public class RepairRunnable extends WrappedRunnable implements ProgressEventNoti CoordinatorSession coordinatorSession = ActiveRepairService.instance.consistent.coordinated.registerSession(parentSession, allParticipants); ListeningExecutorService executor = createExecutor(); AtomicBoolean hasFailure = new AtomicBoolean(false); - ListenableFuture repairResult = coordinatorSession.execute(executor, - () -> submitRepairSessions(parentSession, true, executor, commonRanges, cfnames), + ListenableFuture repairResult = coordinatorSession.execute(() -> submitRepairSessions(parentSession, true, executor, commonRanges, cfnames), hasFailure); Collection<Range<Token>> ranges = new HashSet<>(); for (Collection<Range<Token>> range : Iterables.transform(commonRanges, cr -> cr.right)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java index af0a0dd..803a1f8 100644 --- a/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/ConsistentSession.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executor; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; @@ -96,8 +95,8 @@ import org.apache.cassandra.tools.nodetool.RepairAdmin; * conflicts with in progress compactions. The sstables will be marked repaired as part of the normal compaction process. * <p/> * - * On the coordinator side, see {@link CoordinatorSession#finalizePropose(Executor)}, {@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)}, - * & {@link CoordinatorSession#finalizeCommit(Executor)} + * On the coordinator side, see {@link CoordinatorSession#finalizePropose()}, {@link CoordinatorSession#handleFinalizePromise(InetAddress, boolean)}, + * & {@link CoordinatorSession#finalizeCommit()} * <p/> * * On the local session side, see {@link LocalSessions#handleFinalizeProposeMessage(InetAddress, FinalizePropose)} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java index bac749e..830ed2c 100644 --- a/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java +++ b/src/java/org/apache/cassandra/repair/consistent/CoordinatorSession.java @@ -22,7 +22,6 @@ import java.net.InetAddress; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; @@ -34,7 +33,6 @@ import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; @@ -139,7 +137,7 @@ public class CoordinatorSession extends ConsistentSession MessagingService.instance().sendOneWay(messageOut, destination); } - public ListenableFuture<Boolean> prepare(Executor executor) + public ListenableFuture<Boolean> prepare() { Preconditions.checkArgument(allStates(State.PREPARING)); @@ -147,7 +145,7 @@ public class CoordinatorSession extends ConsistentSession PrepareConsistentRequest message = new PrepareConsistentRequest(sessionID, coordinator, participants); for (final InetAddress participant : participants) { - executor.execute(() -> sendMessage(participant, message)); + sendMessage(participant, message); } return prepareFuture; } @@ -181,14 +179,14 @@ public class CoordinatorSession extends ConsistentSession setAll(State.REPAIRING); } - public synchronized ListenableFuture<Boolean> finalizePropose(Executor executor) + public synchronized ListenableFuture<Boolean> finalizePropose() { Preconditions.checkArgument(allStates(State.REPAIRING)); logger.debug("Proposing finalization of repair session {}", sessionID); FinalizePropose message = new FinalizePropose(sessionID); for (final InetAddress participant : participants) { - executor.execute(() -> sendMessage(participant, message)); + sendMessage(participant, message); } return finalizeProposeFuture; } @@ -217,25 +215,20 @@ public class CoordinatorSession extends ConsistentSession } } - public synchronized void finalizeCommit(Executor executor) + public synchronized void finalizeCommit() { Preconditions.checkArgument(allStates(State.FINALIZE_PROMISED)); logger.debug("Committing finalization of repair session {}", sessionID); FinalizeCommit message = new FinalizeCommit(sessionID); for (final InetAddress participant : participants) { - executor.execute(() -> sendMessage(participant, message)); + sendMessage(participant, message); } setAll(State.FINALIZED); logger.info("Incremental repair session {} completed", sessionID); } - public void fail() - { - fail(MoreExecutors.directExecutor()); - } - - public synchronized void fail(Executor executor) + public synchronized void fail() { logger.info("Incremental repair session {} failed", sessionID); FailSession message = new FailSession(sessionID); @@ -243,7 +236,7 @@ public class CoordinatorSession extends ConsistentSession { if (participantStates.get(participant) != State.FAILED) { - executor.execute(() -> sendMessage(participant, message)); + sendMessage(participant, message); } } setAll(State.FAILED); @@ -262,12 +255,12 @@ public class CoordinatorSession extends ConsistentSession /** * Runs the asynchronous consistent repair session. Actual repair sessions are scheduled via a submitter to make unit testing easier */ - public ListenableFuture execute(Executor executor, Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, AtomicBoolean hasFailure) + public ListenableFuture execute(Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSubmitter, AtomicBoolean hasFailure) { logger.info("Beginning coordination of incremental repair session {}", sessionID); sessionStart = System.currentTimeMillis(); - ListenableFuture<Boolean> prepareResult = prepare(executor); + ListenableFuture<Boolean> prepareResult = prepare(); // run repair sessions normally ListenableFuture<List<RepairSessionResult>> repairSessionResults = Futures.transform(prepareResult, new AsyncFunction<Boolean, List<RepairSessionResult>>() @@ -309,7 +302,7 @@ public class CoordinatorSession extends ConsistentSession } else { - return finalizePropose(executor); + return finalizePropose(); } } }); @@ -325,7 +318,7 @@ public class CoordinatorSession extends ConsistentSession { logger.debug("Incremental repair {} finalization phase completed in {}", sessionID, formatDuration(finalizeStart, System.currentTimeMillis())); } - finalizeCommit(executor); + finalizeCommit(); if (logger.isDebugEnabled()) { logger.debug("Incremental repair {} phase completed in {}", sessionID, formatDuration(sessionStart, System.currentTimeMillis())); @@ -334,7 +327,7 @@ public class CoordinatorSession extends ConsistentSession else { hasFailure.set(true); - fail(executor); + fail(); } } @@ -345,7 +338,7 @@ public class CoordinatorSession extends ConsistentSession logger.debug("Incremental repair {} phase failed in {}", sessionID, formatDuration(sessionStart, System.currentTimeMillis())); } hasFailure.set(true); - fail(executor); + fail(); } }); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java index 3c27b5e..fb312c3 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionTest.java @@ -24,14 +24,12 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Supplier; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.SettableFuture; import org.junit.Assert; @@ -118,26 +116,26 @@ public class CoordinatorSessionTest extends AbstractRepairTest Runnable onFinalizeCommit = null; boolean finalizeCommitCalled = false; - public synchronized void finalizeCommit(Executor executor) + public synchronized void finalizeCommit() { finalizeCommitCalled = true; if (onFinalizeCommit != null) { onFinalizeCommit.run(); } - super.finalizeCommit(executor); + super.finalizeCommit(); } Runnable onFail = null; boolean failCalled = false; - public synchronized void fail(Executor executor) + public synchronized void fail() { failCalled = true; if (onFail != null) { onFail.run(); } - super.fail(executor); + super.fail(); } } @@ -209,7 +207,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest public void successCase() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); - Executor executor = MoreExecutors.directExecutor(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> @@ -222,7 +219,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); for (InetAddress participant : PARTICIPANTS) { @@ -294,7 +291,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest public void failedRepairs() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); - Executor executor = MoreExecutors.directExecutor(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> @@ -307,7 +303,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); for (InetAddress participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -357,7 +353,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest public void failedPrepare() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); - Executor executor = MoreExecutors.directExecutor(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> @@ -370,7 +365,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); for (InetAddress participant : PARTICIPANTS) { PrepareConsistentRequest expected = new PrepareConsistentRequest(coordinator.sessionID, COORDINATOR, new HashSet<>(PARTICIPANTS)); @@ -413,7 +408,6 @@ public class CoordinatorSessionTest extends AbstractRepairTest public void failedPropose() { InstrumentedCoordinatorSession coordinator = createInstrumentedSession(); - Executor executor = MoreExecutors.directExecutor(); AtomicBoolean repairSubmitted = new AtomicBoolean(false); SettableFuture<List<RepairSessionResult>> repairFuture = SettableFuture.create(); Supplier<ListenableFuture<List<RepairSessionResult>>> sessionSupplier = () -> @@ -426,7 +420,7 @@ public class CoordinatorSessionTest extends AbstractRepairTest AtomicBoolean hasFailures = new AtomicBoolean(false); Assert.assertFalse(repairSubmitted.get()); Assert.assertTrue(coordinator.sentMessages.isEmpty()); - ListenableFuture sessionResult = coordinator.execute(executor, sessionSupplier, hasFailures); + ListenableFuture sessionResult = coordinator.execute(sessionSupplier, hasFailures); for (InetAddress participant : PARTICIPANTS) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3234c070/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java index b7adb27..b40e185 100644 --- a/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java +++ b/test/unit/org/apache/cassandra/repair/consistent/CoordinatorSessionsTest.java @@ -21,7 +21,6 @@ package org.apache.cassandra.repair.consistent; import java.net.InetAddress; import java.util.Set; import java.util.UUID; -import java.util.concurrent.Executor; import com.google.common.collect.Sets; import org.junit.Assert; @@ -76,7 +75,7 @@ public class CoordinatorSessionsTest extends AbstractRepairTest } int failCalls = 0; - public synchronized void fail(Executor executor) + public synchronized void fail() { failCalls++; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org