Merge branch 'cassandra-2.1' into trunk
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a583f70e Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a583f70e Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a583f70e Branch: refs/heads/trunk Commit: a583f70eeeada31478a55b5774ef222c55956220 Parents: 40a7e86 93478ab Author: Yuki Morishita <[email protected]> Authored: Fri May 8 17:53:37 2015 -0500 Committer: Yuki Morishita <[email protected]> Committed: Fri May 8 17:53:37 2015 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/db/SystemKeyspace.java | 31 +++++++ .../cassandra/repair/AnticompactionTask.java | 98 ++++++++++++++++++++ .../repair/RepairMessageVerbHandler.java | 31 ++++--- .../apache/cassandra/repair/RepairRunnable.java | 34 +++---- .../cassandra/service/ActiveRepairService.java | 27 +++--- .../apache/cassandra/utils/SemanticVersion.java | 2 +- 7 files changed, 177 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/CHANGES.txt ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/db/SystemKeyspace.java index c9d77f4,5beb709..6b4bb73 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@@ -673,6 -546,37 +673,37 @@@ public final class SystemKeyspac } /** + * Get release version for given endpoint. + * If release version is unknown, then this returns null. + * + * @param ep endpoint address to check + * @return Release version or null if version is unknown. + */ + public static SemanticVersion getReleaseVersion(InetAddress ep) + { + try + { + if (FBUtilities.getBroadcastAddress().equals(ep)) + { + return new SemanticVersion(FBUtilities.getReleaseVersionString()); + } + String req = "SELECT release_version FROM system.%s WHERE peer=?"; - UntypedResultSet result = executeInternal(String.format(req, PEERS_CF), ep); ++ UntypedResultSet result = executeInternal(String.format(req, PEERS), ep); + if (result != null && result.one().has("release_version")) + { + return new SemanticVersion(result.one().getString("release_version")); + } + // version is unknown + return null; + } + catch (IllegalArgumentException e) + { + // version string cannot be parsed + return null; + } + } + + /** * One of three things will happen if you try to read the system keyspace: * 1. files are present and you can read them: great * 2. no files are there: great (new node is assumed) http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/AnticompactionTask.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/AnticompactionTask.java index 0000000,e505d91..d1bbb82 mode 000000,100644..100644 --- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java +++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java @@@ -1,0 -1,93 +1,98 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.cassandra.repair; + + import java.net.InetAddress; ++import java.util.Collection; + import java.util.UUID; + import java.util.concurrent.TimeUnit; + + import com.google.common.util.concurrent.AbstractFuture; + + import org.apache.cassandra.db.SystemKeyspace; ++import org.apache.cassandra.dht.Range; ++import org.apache.cassandra.dht.Token; + import org.apache.cassandra.net.IAsyncCallbackWithFailure; + import org.apache.cassandra.net.MessageIn; + import org.apache.cassandra.net.MessagingService; + import org.apache.cassandra.repair.messages.AnticompactionRequest; + import org.apache.cassandra.utils.SemanticVersion; + + public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable + { + /* + * Version that anticompaction response is not supported up to. + * If Cassandra version is more than this, we need to wait for anticompaction response. + */ + private static final SemanticVersion VERSION_CHECKER = new SemanticVersion("2.1.5"); + + private final UUID parentSession; + private final InetAddress neighbor; ++ private final Collection<Range<Token>> successfulRanges; + - public AnticompactionTask(UUID parentSession, InetAddress neighbor) ++ public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges) + { + this.parentSession = parentSession; + this.neighbor = neighbor; ++ this.successfulRanges = successfulRanges; + } + + public void run() + { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); ++ AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); + SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor); + if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0) + { + MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true); + } + else + { + MessagingService.instance().sendOneWay(acr.createMessage(), neighbor); + // immediately return after sending request + set(neighbor); + } + } + + /** + * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage. + */ + public static class AnticompactionCallback implements IAsyncCallbackWithFailure + { + final AnticompactionTask task; + + public AnticompactionCallback(AnticompactionTask task) + { + this.task = task; + } + + public void response(MessageIn msg) + { + task.set(msg.from); + } + + public boolean isLatencyForSnitch() + { + return false; + } + + public void onFailure(InetAddress from) + { + task.setException(new RuntimeException("Anticompaction failed or timed out in " + from)); + } + } + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 4984d3a,60b2243..e7613c5 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@@ -17,20 -17,15 +17,12 @@@ */ package org.apache.cassandra.repair; --import java.util.ArrayList; --import java.util.Collections; --import java.util.List; - import java.util.Set; --import java.util.UUID; --import java.util.concurrent.Future; ++import java.util.*; import com.google.common.base.Predicate; +import com.google.common.collect.Sets; - - import org.apache.cassandra.dht.Bounds; - import org.apache.cassandra.dht.Range; - import org.apache.cassandra.dht.Token; - import org.apache.cassandra.io.sstable.format.SSTableReader; + import com.google.common.util.concurrent.ListenableFuture; + import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -38,14 -33,18 +30,17 @@@ import org.apache.cassandra.config.Sche import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.compaction.CompactionManager; + import org.apache.cassandra.dht.Bounds; import org.apache.cassandra.dht.LocalPartitioner; + import org.apache.cassandra.dht.Range; + import org.apache.cassandra.dht.Token; -import org.apache.cassandra.io.sstable.SSTableReader; ++import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.service.ActiveRepairService; --import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@@ -128,9 -112,17 +123,17 @@@ public class RepairMessageVerbHandler i break; case ANTICOMPACTION_REQUEST: - logger.debug("Got anticompaction request"); AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload; - ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession); + logger.debug("Got anticompaction request {}", anticompactionRequest); - ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); ++ ListenableFuture<?> compactionDone = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession, anticompactionRequest.successfulRanges); + compactionDone.addListener(new Runnable() + { + @Override + public void run() + { + MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from); + } + }, MoreExecutors.sameThreadExecutor()); break; default: http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/repair/RepairRunnable.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java index 2900794,0000000..28511db mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/repair/RepairRunnable.java +++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java @@@ -1,399 -1,0 +1,399 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; ++import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.*; +import org.apache.commons.lang3.time.DurationFormatUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; +import org.apache.cassandra.cql3.QueryOptions; +import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.UntypedResultSet; +import org.apache.cassandra.cql3.statements.SelectStatement; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tracing.TraceKeyspace; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; +import org.apache.cassandra.utils.WrappedRunnable; +import org.apache.cassandra.utils.progress.ProgressEvent; +import org.apache.cassandra.utils.progress.ProgressEventNotifier; +import org.apache.cassandra.utils.progress.ProgressEventType; +import org.apache.cassandra.utils.progress.ProgressListener; + +public class RepairRunnable extends WrappedRunnable implements ProgressEventNotifier +{ + private static final Logger logger = LoggerFactory.getLogger(RepairRunnable.class); + + private StorageService storageService; + private final int cmd; + private final RepairOption options; + private final String keyspace; + + private final List<ProgressListener> listeners = new ArrayList<>(); + + public RepairRunnable(StorageService storageService, int cmd, RepairOption options, String keyspace) + { + this.storageService = storageService; + this.cmd = cmd; + this.options = options; + this.keyspace = keyspace; + } + + @Override + public void addProgressListener(ProgressListener listener) + { + listeners.add(listener); + } + + @Override + public void removeProgressListener(ProgressListener listener) + { + listeners.remove(listener); + } + + protected void fireProgressEvent(String tag, ProgressEvent event) + { + for (ProgressListener listener : listeners) + { + listener.progress(tag, event); + } + } + + protected void fireErrorAndComplete(String tag, int progressCount, int totalProgress, String message) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progressCount, totalProgress, message)); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progressCount, totalProgress)); + } + + protected void runMayThrow() throws Exception + { + final TraceState traceState; + + final String tag = "repair:" + cmd; + + final AtomicInteger progress = new AtomicInteger(); + final int totalProgress = 3 + options.getRanges().size(); // calculate neighbors, validation, prepare for repair + number of ranges to repair + + String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); + Iterable<ColumnFamilyStore> validColumnFamilies = storageService.getValidColumnFamilies(false, false, keyspace, + columnFamilies); + + final long startTime = System.currentTimeMillis(); + String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, + options); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 100, message)); + if (options.isTraced()) + { + StringBuilder cfsb = new StringBuilder(); + for (ColumnFamilyStore cfs : validColumnFamilies) + cfsb.append(", ").append(cfs.keyspace.getName()).append(".").append(cfs.name); + + UUID sessionId = Tracing.instance.newSession(Tracing.TraceType.REPAIR); + traceState = Tracing.instance.begin("repair", ImmutableMap.of("keyspace", keyspace, "columnFamilies", + cfsb.substring(2))); + Tracing.traceRepair(message); + traceState.enableActivityNotification(tag); + for (ProgressListener listener : listeners) + traceState.addProgressListener(listener); + Thread queryThread = createQueryThread(cmd, sessionId); + queryThread.setName("RepairTracePolling"); + queryThread.start(); + } + else + { + traceState = null; + } + + final Set<InetAddress> allNeighbors = new HashSet<>(); + Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); + try + { + for (Range<Token> range : options.getRanges()) + { + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, + options.getDataCenters(), + options.getHosts()); + rangeToNeighbors.put(range, neighbors); + allNeighbors.addAll(neighbors); + } + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + logger.error("Repair failed:", e); + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + // Validate columnfamilies + List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); + try + { + Iterables.addAll(columnFamilyStores, validColumnFamilies); + progress.incrementAndGet(); + } + catch (IllegalArgumentException e) + { + fireErrorAndComplete(tag, progress.get(), totalProgress, e.getMessage()); + return; + } + + String[] cfnames = new String[columnFamilyStores.size()]; + for (int i = 0; i < columnFamilyStores.size(); i++) + { + cfnames[i] = columnFamilyStores.get(i).name; + } + + final UUID parentSession = UUIDGen.getTimeUUID(); + SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, cfnames, options.getRanges()); + long repairedAt; + try + { + ActiveRepairService.instance.prepareForRepair(parentSession, allNeighbors, options, columnFamilyStores); + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; + progress.incrementAndGet(); + } + catch (Throwable t) + { + SystemDistributedKeyspace.failParentRepair(parentSession, t); + fireErrorAndComplete(tag, progress.get(), totalProgress, t.getMessage()); + return; + } + + // Set up RepairJob executor for this repair command. + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + + List<ListenableFuture<RepairSessionResult>> futures = new ArrayList<>(options.getRanges().size()); + for (Range<Token> range : options.getRanges()) + { + final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + range, + keyspace, + options.getParallelism(), + rangeToNeighbors.get(range), + repairedAt, + executor, + cfnames); + if (session == null) + continue; + // After repair session completes, notify client its result + Futures.addCallback(session, new FutureCallback<RepairSessionResult>() + { + public void onSuccess(RepairSessionResult result) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), + session.getRange().toString()); + logger.info(message); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + + public void onFailure(Throwable t) + { + String message = String.format("Repair session %s for range %s failed with error %s", + session.getId(), session.getRange().toString(), t.getMessage()); + logger.error(message, t); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.PROGRESS, + progress.incrementAndGet(), + totalProgress, + message)); + } + }); + futures.add(session); + } + + // After all repair sessions completes(successful or not), + // run anticompaction if necessary and send finish notice back to client ++ final Collection<Range<Token>> successfulRanges = new ArrayList<>(); ++ final AtomicBoolean hasFailure = new AtomicBoolean(); + final ListenableFuture<List<RepairSessionResult>> allSessions = Futures.successfulAsList(futures); - Futures.addCallback(allSessions, new FutureCallback<List<RepairSessionResult>>() ++ ListenableFuture anticompactionResult = Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, Object>() + { - public void onSuccess(List<RepairSessionResult> result) ++ @SuppressWarnings("unchecked") ++ public ListenableFuture apply(List<RepairSessionResult> results) throws Exception + { - boolean hasFailure = false; + // filter out null(=failed) results and get successful ranges - Collection<Range<Token>> successfulRanges = new ArrayList<>(); - for (RepairSessionResult sessionResult : result) ++ for (RepairSessionResult sessionResult : results) + { + if (sessionResult != null) + { + successfulRanges.add(sessionResult.range); + } + else + { - hasFailure = true; ++ hasFailure.compareAndSet(false, true); + } + } - try - { - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); - SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); - } - catch (Exception e) - { - logger.error("Error in incremental repair", e); - SystemDistributedKeyspace.failParentRepair(parentSession, e); - } - if (hasFailure) ++ return ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successfulRanges); ++ } ++ }); ++ Futures.addCallback(anticompactionResult, new FutureCallback<Object>() ++ { ++ public void onSuccess(Object result) ++ { ++ SystemDistributedKeyspace.successfulParentRepair(parentSession, successfulRanges); ++ if (hasFailure.get()) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, + "Some repair failed")); + } + else + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress, + "Repair completed successfully")); + } + repairComplete(); + } + + public void onFailure(Throwable t) + { + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, t.getMessage())); + SystemDistributedKeyspace.failParentRepair(parentSession, t); + repairComplete(); + } + + private void repairComplete() + { + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, + true, true); + String message = String.format("Repair command #%d finished in %s", cmd, duration); + fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, message)); + logger.info(message); + if (options.isTraced() && traceState != null) + { + for (ProgressListener listener : listeners) + traceState.removeProgressListener(listener); + // Because DebuggableThreadPoolExecutor#afterExecute and this callback + // run in a nondeterministic order (within the same thread), the + // TraceState may have been nulled out at this point. The TraceState + // should be traceState, so just set it without bothering to check if it + // actually was nulled out. + Tracing.instance.set(traceState); + Tracing.traceRepair(message); + Tracing.instance.stopSession(); + } + executor.shutdownNow(); + } + }); + } + + private Thread createQueryThread(final int cmd, final UUID sessionId) + { + return new Thread(new WrappedRunnable() + { + // Query events within a time interval that overlaps the last by one second. Ignore duplicates. Ignore local traces. + // Wake up upon local trace activity. Query when notified of trace activity with a timeout that doubles every two timeouts. + public void runMayThrow() throws Exception + { + TraceState state = Tracing.instance.get(sessionId); + if (state == null) + throw new Exception("no tracestate"); + + String format = "select event_id, source, activity from %s.%s where session_id = ? and event_id > ? and event_id < ?;"; + String query = String.format(format, TraceKeyspace.NAME, TraceKeyspace.EVENTS); + SelectStatement statement = (SelectStatement) QueryProcessor.parseStatement(query).prepare().statement; + + ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId); + InetAddress source = FBUtilities.getBroadcastAddress(); + + HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new HashSet<>() }; + int si = 0; + UUID uuid; + + long tlast = System.currentTimeMillis(), tcur; + + TraceState.Status status; + long minWaitMillis = 125; + long maxWaitMillis = 1000 * 1024L; + long timeout = minWaitMillis; + boolean shouldDouble = false; + + while ((status = state.waitActivity(timeout)) != TraceState.Status.STOPPED) + { + if (status == TraceState.Status.IDLE) + { + timeout = shouldDouble ? Math.min(timeout * 2, maxWaitMillis) : timeout; + shouldDouble = !shouldDouble; + } + else + { + timeout = minWaitMillis; + shouldDouble = false; + } + ByteBuffer tminBytes = ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000)); + ByteBuffer tmaxBytes = ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis())); + QueryOptions options = QueryOptions.forInternalCalls(ConsistencyLevel.ONE, Lists.newArrayList(sessionIdBytes, + tminBytes, + tmaxBytes)); + ResultMessage.Rows rows = statement.execute(QueryState.forInternalCalls(), options); + UntypedResultSet result = UntypedResultSet.create(rows.result); + + for (UntypedResultSet.Row r : result) + { + if (source.equals(r.getInetAddress("source"))) + continue; + if ((uuid = r.getUUID("event_id")).timestamp() > (tcur - 1000) * 10000) + seen[si].add(uuid); + if (seen[si == 0 ? 1 : 0].contains(uuid)) + continue; + String message = String.format("%s: %s", r.getInetAddress("source"), r.getString("activity")); + fireProgressEvent("repair:" + cmd, + new ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message)); + } + tlast = tcur; + + si = si == 0 ? 1 : 0; + seen[si].clear(); + } + } + }); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/a583f70e/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java index a6ebe89,8d3563c..d350f4e --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@@ -28,12 -27,9 +28,9 @@@ import java.util.concurrent.atomic.Atom import com.google.common.collect.ImmutableMap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; - import com.google.common.util.concurrent.ListeningExecutorService; - import com.google.common.util.concurrent.MoreExecutors; -- import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; ++import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -52,14 -49,15 +49,14 @@@ import org.apache.cassandra.net.IAsyncC import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.*; -import org.apache.cassandra.repair.messages.PrepareMessage; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.ValidationComplete; ++import org.apache.cassandra.repair.AnticompactionTask; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.RepairParallelism; +import org.apache.cassandra.repair.RepairSession; +import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; import org.apache.cassandra.utils.concurrent.Ref; -- import org.apache.cassandra.utils.concurrent.Refs; /** @@@ -312,19 -325,29 +309,19 @@@ public class ActiveRepairServic * * @param parentSession Parent session ID * @param neighbors Repair participants (not including self) - * @param doAntiCompaction true if repair session needs anti compaction - * @throws InterruptedException - * @throws ExecutionException + * @param successfulRanges Ranges that repaired successfully - * @throws InterruptedException - * @throws ExecutionException */ - public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) throws InterruptedException, ExecutionException - public synchronized ListenableFuture<?> finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException ++ public synchronized ListenableFuture finishParentSession(UUID parentSession, Set<InetAddress> neighbors, Collection<Range<Token>> successfulRanges) { - for (InetAddress neighbor : neighbors) - if (doAntiCompaction) -- { - AnticompactionRequest acr = new AnticompactionRequest(parentSession, successfulRanges); - MessageOut<RepairMessage> req = acr.createMessage(); - MessagingService.instance().sendOneWay(req, neighbor); - } - doAntiCompaction(parentSession, successfulRanges).get(); + List<ListenableFuture<?>> tasks = new ArrayList<>(neighbors.size() + 1); + for (InetAddress neighbor : neighbors) + { - AnticompactionTask task = new AnticompactionTask(parentSession, neighbor); ++ AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges); + tasks.add(task); + task.run(); // 'run' is just sending message + } - tasks.add(doAntiCompaction(parentSession)); ++ tasks.add(doAntiCompaction(parentSession, successfulRanges)); + return Futures.successfulAsList(tasks); - } - else - { - removeParentRepairSession(parentSession); - return Futures.immediateFuture(null); - } } public ParentRepairSession getParentRepairSession(UUID parentSessionId)
