Repository: cassandra Updated Branches: refs/heads/cassandra-2.1 9b6f55bde -> 6d06f32a7 refs/heads/trunk d305ce42f -> be6295ede
Fix anticompaction blocking ANTI_ENTROPY stage patch by yukim; reviewed by sankalp kohli for CASSANDRA-9151 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d06f32a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d06f32a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d06f32a Branch: refs/heads/cassandra-2.1 Commit: 6d06f32a7729524a195ad67ba82157e390dc3912 Parents: 9b6f55b Author: Yuki Morishita <[email protected]> Authored: Fri May 1 15:06:35 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri May 1 15:06:35 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../db/compaction/CompactionManager.java | 11 ++-- .../repair/RepairMessageVerbHandler.java | 15 +---- .../cassandra/service/ActiveRepairService.java | 61 ++++++++++++++------ .../cassandra/service/StorageService.java | 6 +- 5 files changed, 55 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 627cc6b..a6e4e41 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -6,6 +6,7 @@ * Update tuple and collection types that use a user-defined type when that UDT is modified (CASSANDRA-9148, CASSANDRA-9192) * Use higher timeout for prepair and snapshot in repair (CASSANDRA-9261) + * Fix anticompaction blocking ANTI_ENTROPY stage (CASSANDRA-9151) Merged from 2.0: * Fix index selection during rebuild with certain table layouts (CASSANDRA-9281) * Fix partition-level-delete-only workload accounting (CASSANDRA-9194) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index 72deb21..7215945 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -50,8 +50,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import com.google.common.collect.Multiset; import com.google.common.collect.Sets; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.RateLimiter; +import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -387,7 +386,7 @@ public class CompactionManager implements CompactionManagerMBean }); } - public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs, + public ListenableFuture<?> submitAntiCompaction(final ColumnFamilyStore cfs, final Collection<Range<Token>> ranges, final Refs<SSTableReader> sstables, final long repairedAt) @@ -417,7 +416,9 @@ public class CompactionManager implements CompactionManagerMBean return Futures.immediateCancelledFuture(); } - return executor.submit(runnable); + ListenableFutureTask<?> task = ListenableFutureTask.create(runnable, null); + executor.submit(task); + return task; } /** @@ -483,7 +484,7 @@ public class CompactionManager implements CompactionManagerMBean cfs.getDataTracker().unmarkCompacting(sstables); } - logger.info(String.format("Completed anticompaction successfully")); + logger.info("Completed anticompaction successfully"); } public void performMaximal(final ColumnFamilyStore cfStore) throws InterruptedException, ExecutionException http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index c7cf4c8..5b25afa 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -112,20 +112,7 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case ANTICOMPACTION_REQUEST: logger.debug("Got anticompaction request"); AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; - try - { - List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); - FBUtilities.waitOnFutures(futures); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - finally - { - ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession); - } - + ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); break; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index ac5ed99..5cc26ed 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -18,7 +18,6 @@ package org.apache.cassandra.service; import java.io.File; -import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; @@ -29,6 +28,9 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +38,6 @@ import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager; -import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; @@ -57,7 +58,6 @@ import org.apache.cassandra.repair.messages.ValidationComplete; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; -import org.apache.cassandra.utils.concurrent.RefCounted; import org.apache.cassandra.utils.concurrent.Refs; @@ -320,25 +320,31 @@ public class ActiveRepairService return repairing; } - public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException + /** + * Run final process of repair. + * This removes all resources held by parent repair session, after performing anti compaction if necessary. + * + * @param parentSession Parent session ID + * @param neighbors Repair participants (not including self) + * @param doAntiCompaction true if repair session needs anti compaction + * @throws InterruptedException + * @throws ExecutionException + */ + public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException { - try + if (doAntiCompaction) { - if (doAntiCompaction) + for (InetAddress neighbor : neighbors) { - for (InetAddress neighbor : neighbors) - { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); - MessageOut<RepairMessage> req = acr.createMessage(); - MessagingService.instance().sendOneWay(req, neighbor); - } - List<Future<?>> futures = doAntiCompaction(parentSession); - FBUtilities.waitOnFutures(futures); + AnticompactionRequest acr = new AnticompactionRequest(parentSession); + MessageOut<RepairMessage> req = acr.createMessage(); + MessagingService.instance().sendOneWay(req, neighbor); } + doAntiCompaction(parentSession).get(); } - finally + else { - parentRepairSessions.remove(parentSession); + removeParentRepairSession(parentSession); } } @@ -352,12 +358,19 @@ public class ActiveRepairService return parentRepairSessions.remove(parentSessionId); } - public List<Future<?>> doAntiCompaction(UUID parentRepairSession) throws InterruptedException, ExecutionException, IOException + /** + * Submit anti-compaction jobs to CompactionManager. + * When all jobs are done, parent repair session is removed whether those are suceeded or not. + * + * @param parentRepairSession parent repair session ID + * @return Future result of all anti-compaction jobs. + */ + public ListenableFuture<List<Object>> doAntiCompaction(final UUID parentRepairSession) { assert parentRepairSession != null; ParentRepairSession prs = getParentRepairSession(parentRepairSession); - List<Future<?>> futures = new ArrayList<>(); + List<ListenableFuture<?>> futures = new ArrayList<>(); for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet()) { Refs<SSTableReader> sstables = prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()); @@ -365,7 +378,17 @@ public class ActiveRepairService futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt)); } - return futures; + ListenableFuture<List<Object>> allAntiCompactionResults = Futures.successfulAsList(futures); + allAntiCompactionResults.addListener(new Runnable() + { + @Override + public void run() + { + removeParentRepairSession(parentRepairSession); + } + }, MoreExecutors.sameThreadExecutor()); + + return allAntiCompactionResults; } public void handleMessage(InetAddress endpoint, RepairMessage message) http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d06f32a/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index d8fa831..8521256 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2732,7 +2732,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException { - return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), dataCenters, hosts, fullRepair, columnFamilies); + return forceRepairRangeAsync(beginToken, endToken, keyspaceName, + isSequential ? RepairParallelism.SEQUENTIAL.ordinal() : RepairParallelism.PARALLEL.ordinal(), + dataCenters, hosts, fullRepair, columnFamilies); } public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, int parallelismDegree, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) @@ -2939,7 +2941,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } } if (!fullRepair) + { ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful); + } sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); } }, null);
