improve concurrency of repair process patch by yukim; reviewed by krummas for CASSANDRA-6455
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/810c2d5f Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/810c2d5f Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/810c2d5f Branch: refs/heads/trunk Commit: 810c2d5fe64333c0bcfe0b2ed3ea2c8f6aaf89b7 Parents: 5c35f92 Author: Yuki Morishita <[email protected]> Authored: Thu Oct 9 18:12:36 2014 -0500 Committer: Yuki Morishita <[email protected]> Committed: Thu Oct 9 18:12:36 2014 -0500 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/repair/Differencer.java | 136 -------- .../repair/IRepairJobEventListener.java | 31 -- .../apache/cassandra/repair/LocalSyncTask.java | 81 +++++ .../apache/cassandra/repair/RemoteSyncTask.java | 67 ++++ .../apache/cassandra/repair/RepairFuture.java | 31 -- .../org/apache/cassandra/repair/RepairJob.java | 235 +++++++------- .../repair/RepairMessageVerbHandler.java | 7 +- .../apache/cassandra/repair/RepairResult.java | 32 ++ .../apache/cassandra/repair/RepairSession.java | 263 ++++++--------- .../cassandra/repair/RequestCoordinator.java | 128 -------- .../apache/cassandra/repair/SnapshotTask.java | 1 + .../cassandra/repair/StreamingRepairTask.java | 59 ++-- .../org/apache/cassandra/repair/SyncStat.java | 33 ++ .../org/apache/cassandra/repair/SyncTask.java | 83 +++++ .../apache/cassandra/repair/ValidationTask.java | 71 ++++ .../cassandra/repair/messages/RepairOption.java | 290 +++++++++++++++++ .../cassandra/service/ActiveRepairService.java | 129 ++++---- .../cassandra/service/StorageService.java | 320 +++++++++++-------- .../cassandra/service/StorageServiceMBean.java | 23 +- .../cassandra/streaming/StreamReceiveTask.java | 11 +- .../org/apache/cassandra/tools/NodeProbe.java | 122 +------ .../org/apache/cassandra/tools/NodeTool.java | 39 ++- .../apache/cassandra/tools/RepairRunner.java | 101 ++++++ .../cassandra/repair/DifferencerTest.java | 164 ---------- .../cassandra/repair/LocalSyncTaskTest.java | 129 ++++++++ .../cassandra/repair/RepairSessionTest.java | 71 ++++ .../repair/messages/RepairOptionTest.java | 88 +++++ .../service/ActiveRepairServiceTest.java | 218 +++++++++++++ .../service/AntiEntropyServiceCounterTest.java | 47 --- .../service/AntiEntropyServiceStandardTest.java | 47 --- .../service/AntiEntropyServiceTestAbstract.java | 282 ---------------- 32 files changed, 1793 insertions(+), 1547 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 432299a..6d67e2e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -29,6 +29,7 @@ * Use unsafe mutations for most unit tests (CASSANDRA-6969) * Fix race condition during calculation of pending ranges (CASSANDRA-7390) * Fail on very large batch sizes (CASSANDRA-8011) + * improve concurrency of repair (CASSANDRA-6455) 2.1.1 http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/Differencer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/Differencer.java b/src/java/org/apache/cassandra/repair/Differencer.java deleted file mode 100644 index 214d2c9..0000000 --- a/src/java/org/apache/cassandra/repair/Differencer.java +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; - -import com.google.common.base.Objects; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.SyncRequest; -import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; - -/** - * Runs on the node that initiated a request to compare two trees, and launch repairs for disagreeing ranges. - */ -public class Differencer implements Runnable -{ - private static Logger logger = LoggerFactory.getLogger(Differencer.class); - - private final RepairJobDesc desc; - public final TreeResponse r1; - public final TreeResponse r2; - public final List<Range<Token>> differences = new ArrayList<>(); - - public Differencer(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) - { - this.desc = desc; - this.r1 = r1; - this.r2 = r2; - } - - /** - * Compares our trees, and triggers repairs for any ranges that mismatch. - */ - public void run() - { - // compare trees, and collect differences - differences.addAll(MerkleTree.difference(r1.tree, r2.tree)); - - // choose a repair method based on the significance of the difference - String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); - if (differences.isEmpty()) - { - logger.info(String.format(format, "are consistent")); - // send back sync complete message - MessagingService.instance().sendOneWay(new SyncComplete(desc, r1.endpoint, r2.endpoint, true).createMessage(), FBUtilities.getLocalAddress()); - return; - } - - // non-0 difference: perform streaming repair - logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - performStreamingRepair(); - } - - /** - * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback - * that will be called out of band once the streams complete. - */ - void performStreamingRepair() - { - InetAddress local = FBUtilities.getBroadcastAddress(); - // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding - InetAddress src = r2.endpoint.equals(local) ? r2.endpoint : r1.endpoint; - InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; - - SyncRequest request = new SyncRequest(desc, local, src, dst, differences); - StreamingRepairTask task = new StreamingRepairTask(desc, request); - task.run(); - } - - - /** - * In order to remove completed Differencer, equality is computed only from {@code desc} and - * endpoint part of two TreeResponses. - */ - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - Differencer that = (Differencer) o; - if (!desc.equals(that.desc)) return false; - return minEndpoint().equals(that.minEndpoint()) && maxEndpoint().equals(that.maxEndpoint()); - } - - @Override - public int hashCode() - { - return Objects.hashCode(desc, minEndpoint(), maxEndpoint()); - } - - // For equals and hashcode, we don't want to take the endpoint order into account. - // So we just order endpoint deterministically to simplify this - private InetAddress minEndpoint() - { - return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0 - ? r1.endpoint - : r2.endpoint; - } - - private InetAddress maxEndpoint() - { - return FBUtilities.compareUnsigned(r1.endpoint.getAddress(), r2.endpoint.getAddress()) < 0 - ? r2.endpoint - : r1.endpoint; - } - - public String toString() - { - return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + ":" + desc.columnFamily + "@" + desc.range + ">"; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java b/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java deleted file mode 100644 index 778c09d..0000000 --- a/src/java/org/apache/cassandra/repair/IRepairJobEventListener.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -/** - * Implemented by the RepairSession to accept callbacks from sequential snapshot creation failure. - */ - -public interface IRepairJobEventListener -{ - /** - * Signal that there was a failure during the snapshot creation process. - * - */ - public void failedSnapshot(); -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/LocalSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/LocalSyncTask.java b/src/java/org/apache/cassandra/repair/LocalSyncTask.java new file mode 100644 index 0000000..38f63ce --- /dev/null +++ b/src/java/org/apache/cassandra/repair/LocalSyncTask.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.FBUtilities; + +/** + * LocalSyncTask performs streaming between local(coordinator) node and remote replica. + */ +public class LocalSyncTask extends SyncTask implements StreamEventHandler +{ + private static final Logger logger = LoggerFactory.getLogger(LocalSyncTask.class); + + private final long repairedAt; + + public LocalSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2, long repairedAt) + { + super(desc, r1, r2); + this.repairedAt = repairedAt; + } + + /** + * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback + * that will be called out of band once the streams complete. + */ + protected void startSync(List<Range<Token>> differences) + { + InetAddress local = FBUtilities.getBroadcastAddress(); + // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding + InetAddress dst = r2.endpoint.equals(local) ? r1.endpoint : r2.endpoint; + + logger.info(String.format("[repair #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, differences.size(), dst)); + new StreamPlan("Repair", repairedAt, 1).listeners(this) + .flushBeforeTransfer(true) + // request ranges from the remote node + .requestRanges(dst, desc.keyspace, differences, desc.columnFamily) + // send ranges to the remote node + .transferRanges(dst, desc.keyspace, differences, desc.columnFamily) + .execute(); + } + + public void handleStreamEvent(StreamEvent event) { /* noop */ } + + public void onSuccess(StreamState result) + { + logger.info(String.format("[repair #%s] Sync complete between %s and %s on %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily)); + set(stat); + } + + public void onFailure(Throwable t) + { + setException(t); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RemoteSyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RemoteSyncTask.java b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java new file mode 100644 index 0000000..ca5c998 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RemoteSyncTask.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.SyncRequest; +import org.apache.cassandra.utils.FBUtilities; + +/** + * RemoteSyncTask sends {@link SyncRequest} to remote(non-coordinator) node + * to repair(stream) data with other replica. + * + * When RemoteSyncTask receives SyncComplete from remote node, task completes. + */ +public class RemoteSyncTask extends SyncTask +{ + private static final Logger logger = LoggerFactory.getLogger(RemoteSyncTask.class); + + public RemoteSyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + { + super(desc, r1, r2); + } + + protected void startSync(List<Range<Token>> differences) + { + InetAddress local = FBUtilities.getBroadcastAddress(); + SyncRequest request = new SyncRequest(desc, local, r1.endpoint, r2.endpoint, differences); + logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst)); + MessagingService.instance().sendOneWay(request.createMessage(), request.src); + } + + public void syncComplete(boolean success) + { + if (success) + { + set(stat); + } + else + { + setException(new RepairException(desc, String.format("Sync failed between %s and %s", r1.endpoint, r2.endpoint))); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairFuture.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairFuture.java b/src/java/org/apache/cassandra/repair/RepairFuture.java deleted file mode 100644 index 127d873..0000000 --- a/src/java/org/apache/cassandra/repair/RepairFuture.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.concurrent.FutureTask; - -public class RepairFuture extends FutureTask<Void> -{ - public final RepairSession session; - - public RepairFuture(RepairSession session) - { - super(session, null); - this.session = session; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairJob.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairJob.java b/src/java/org/apache/cassandra/repair/RepairJob.java index 8057ed5..b0d17ab 100644 --- a/src/java/org/apache/cassandra/repair/RepairJob.java +++ b/src/java/org/apache/cassandra/repair/RepairJob.java @@ -18,93 +18,68 @@ package org.apache.cassandra.repair; import java.net.InetAddress; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.utils.FBUtilities; -import org.apache.cassandra.utils.MerkleTree; -import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.Pair; /** * RepairJob runs repair on given ColumnFamily. */ -public class RepairJob +public class RepairJob extends AbstractFuture<RepairResult> implements Runnable { private static Logger logger = LoggerFactory.getLogger(RepairJob.class); - public final RepairJobDesc desc; + private final RepairSession session; + private final RepairJobDesc desc; private final boolean isSequential; - // first we send tree requests. this tracks the endpoints remaining to hear from - private final RequestCoordinator<InetAddress> treeRequests; - // tree responses are then tracked here - private final List<TreeResponse> trees = new ArrayList<>(); - // once all responses are received, each tree is compared with each other, and differencer tasks - // are submitted. the job is done when all differencers are complete. + private final long repairedAt; private final ListeningExecutorService taskExecutor; - private final Condition requestsSent = new SimpleCondition(); - private int gcBefore = -1; - - private volatile boolean failed = false; - /* Count down as sync completes */ - private AtomicInteger waitForSync; - - private final IRepairJobEventListener listener; /** * Create repair job to run on specific columnfamily + * + * @param session RepairSession that this RepairJob belongs + * @param columnFamily name of the ColumnFamily to repair + * @param isSequential when true, validation runs sequentially among replica + * @param taskExecutor Executor to run various repair tasks */ - public RepairJob(IRepairJobEventListener listener, - UUID parentSessionId, - UUID sessionId, - String keyspace, + public RepairJob(RepairSession session, String columnFamily, - Range<Token> range, boolean isSequential, + long repairedAt, ListeningExecutorService taskExecutor) { - this.listener = listener; - this.desc = new RepairJobDesc(parentSessionId, sessionId, keyspace, columnFamily, range); + this.session = session; + this.desc = new RepairJobDesc(session.parentRepairSession, session.getId(), session.keyspace, columnFamily, session.getRange()); this.isSequential = isSequential; + this.repairedAt = repairedAt; this.taskExecutor = taskExecutor; - this.treeRequests = new RequestCoordinator<InetAddress>(isSequential) - { - public void send(InetAddress endpoint) - { - ValidationRequest request = new ValidationRequest(desc, gcBefore); - MessagingService.instance().sendOneWay(request.createMessage(), endpoint); - } - }; } /** - * @return true if this job failed - */ - public boolean isFailed() - { - return failed; - } - - /** - * Send merkle tree request to every involved neighbor. + * Runs repair job. + * + * This sets up necessary task and runs them on given {@code taskExecutor}. + * After submitting all tasks, waits until validation with replica completes. */ - public void sendTreeRequests(Collection<InetAddress> endpoints) + public void run() { - // send requests to all nodes - List<InetAddress> allEndpoints = new ArrayList<>(endpoints); + List<InetAddress> allEndpoints = new ArrayList<>(session.endpoints); allEndpoints.add(FBUtilities.getBroadcastAddress()); + ListenableFuture<List<TreeResponse>> validations; if (isSequential) { + // Request snapshot to all replica List<ListenableFuture<InetAddress>> snapshotTasks = new ArrayList<>(allEndpoints.size()); for (InetAddress endpoint : allEndpoints) { @@ -112,102 +87,110 @@ public class RepairJob snapshotTasks.add(snapshotTask); taskExecutor.execute(snapshotTask); } + // When all snapshot complete, send validation requests ListenableFuture<List<InetAddress>> allSnapshotTasks = Futures.allAsList(snapshotTasks); - // Execute send tree request after all snapshot complete - Futures.addCallback(allSnapshotTasks, new FutureCallback<List<InetAddress>>() + validations = Futures.transform(allSnapshotTasks, new AsyncFunction<List<InetAddress>, List<TreeResponse>>() { - public void onSuccess(List<InetAddress> endpoints) + public ListenableFuture<List<TreeResponse>> apply(List<InetAddress> endpoints) throws Exception { - sendTreeRequestsInternal(endpoints); - } - - public void onFailure(Throwable throwable) - { - // TODO need to propagate error to RepairSession - logger.error("Error occurred during snapshot phase", throwable); - listener.failedSnapshot(); - failed = true; + return sendValidationRequest(endpoints); } }, taskExecutor); } else { - sendTreeRequestsInternal(allEndpoints); + // If not sequential, just send validation request to all replica + validations = sendValidationRequest(allEndpoints); } - } - private void sendTreeRequestsInternal(Collection<InetAddress> endpoints) - { - this.gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); - for (InetAddress endpoint : endpoints) - treeRequests.add(endpoint); + // When all validations complete, submit sync tasks + ListenableFuture<List<SyncStat>> syncResults = Futures.transform(validations, new AsyncFunction<List<TreeResponse>, List<SyncStat>>() + { + public ListenableFuture<List<SyncStat>> apply(List<TreeResponse> trees) throws Exception + { + // Unregister from FailureDetector once we've completed synchronizing Merkle trees. + // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. + // See CASSANDRA-3569 + FailureDetector.instance.unregisterFailureDetectionEventListener(session); - logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints)); - treeRequests.start(); - requestsSent.signalAll(); - } + InetAddress local = FBUtilities.getLocalAddress(); - /** - * Add a new received tree and return the number of remaining tree to - * be received for the job to be complete. - * - * Callers may assume exactly one addTree call will result in zero remaining endpoints. - * - * @param endpoint address of the endpoint that sent response - * @param tree sent Merkle tree or null if validation failed on endpoint - * @return the number of responses waiting to receive - */ - public synchronized int addTree(InetAddress endpoint, MerkleTree tree) - { - // Wait for all request to have been performed (see #3400) - try - { - requestsSent.await(); - } - catch (InterruptedException e) + List<SyncTask> syncTasks = new ArrayList<>(); + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) + { + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) + { + TreeResponse r2 = trees.get(j); + SyncTask task; + if (r1.endpoint.equals(local) || r2.endpoint.equals(local)) + { + task = new LocalSyncTask(desc, r1, r2, repairedAt); + } + else + { + task = new RemoteSyncTask(desc, r1, r2); + // RemoteSyncTask expects SyncComplete message sent back. + // Register task to RepairSession to receive response. + session.waitForSync(Pair.create(desc, new NodePair(r1.endpoint, r2.endpoint)), (RemoteSyncTask) task); + } + syncTasks.add(task); + taskExecutor.submit(task); + } + } + return Futures.allAsList(syncTasks); + } + }, taskExecutor); + + // When all sync complete, set the final result + Futures.addCallback(syncResults, new FutureCallback<List<SyncStat>>() { - throw new AssertionError("Interrupted while waiting for requests to be sent"); - } + public void onSuccess(List<SyncStat> stats) + { + logger.info(String.format("[repair #%s] %s is fully synced", session.getId(), desc.columnFamily)); + set(new RepairResult(desc, stats)); + } - if (tree == null) - failed = true; - else - trees.add(new TreeResponse(endpoint, tree)); - return treeRequests.completed(endpoint); + /** + * Snapshot, validation and sync failures are all handled here + */ + public void onFailure(Throwable t) + { + logger.warn(String.format("[repair #%s] %s sync failed", session.getId(), desc.columnFamily)); + setException(t); + } + }, taskExecutor); + + // Wait for validation to complete + Futures.getUnchecked(validations); } /** - * Submit differencers for running. - * All tree *must* have been received before this is called. + * Creates {@link ValidationTask} and submit them to task executor. + * If isSequential flag is true, wait previous ValidationTask to complete before submitting the next. + * + * @param endpoints Endpoint addresses to send validation request + * @return Future that can get all {@link TreeResponse} from replica, if all validation succeed. */ - public void submitDifferencers() + private ListenableFuture<List<TreeResponse>> sendValidationRequest(Collection<InetAddress> endpoints) { - assert !failed; - List<Differencer> differencers = new ArrayList<>(); - // We need to difference all trees one against another - for (int i = 0; i < trees.size() - 1; ++i) + logger.info(String.format("[repair #%s] requesting merkle trees for %s (to %s)", desc.sessionId, desc.columnFamily, endpoints)); + int gcBefore = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily).gcBefore(System.currentTimeMillis()); + List<ListenableFuture<TreeResponse>> tasks = new ArrayList<>(endpoints.size()); + for (InetAddress endpoint : endpoints) { - TreeResponse r1 = trees.get(i); - for (int j = i + 1; j < trees.size(); ++j) + ValidationTask task = new ValidationTask(desc, endpoint, gcBefore); + tasks.add(task); + session.waitForValidation(Pair.create(desc, endpoint), task); + taskExecutor.execute(task); + if (isSequential) { - TreeResponse r2 = trees.get(j); - Differencer differencer = new Differencer(desc, r1, r2); - differencers.add(differencer); - logger.debug("Queueing comparison {}", differencer); + // tasks are sequentially sent so wait until current validation is done. + // NOTE: Wait happens on taskExecutor thread + Futures.getUnchecked(task); } } - waitForSync = new AtomicInteger(differencers.size()); - for (Differencer differencer : differencers) - taskExecutor.submit(differencer); - - trees.clear(); // allows gc to do its thing - } - - /** - * @return true if the given node pair was the last remaining - */ - boolean completedSynchronization() - { - return waitForSync.decrementAndGet() == 0; + return Futures.allAsList(tasks); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java index 54117a3..04a27af 100644 --- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java +++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java @@ -103,7 +103,12 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage> case SYNC_REQUEST: // forwarded sync request SyncRequest request = (SyncRequest) message.payload; - StreamingRepairTask task = new StreamingRepairTask(desc, request); + + long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; + if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) + repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; + + StreamingRepairTask task = new StreamingRepairTask(desc, request, repairedAt); task.run(); break; http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairResult.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairResult.java b/src/java/org/apache/cassandra/repair/RepairResult.java new file mode 100644 index 0000000..259d5f3 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/RepairResult.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.List; + +public class RepairResult +{ + public final RepairJobDesc desc; + public final List<SyncStat> stats; + + public RepairResult(RepairJobDesc desc, List<SyncStat> stats) + { + this.desc = desc; + this.stats = stats; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RepairSession.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java index 9274342..240a21c 100644 --- a/src/java/org/apache/cassandra/repair/RepairSession.java +++ b/src/java/org/apache/cassandra/repair/RepairSession.java @@ -21,13 +21,12 @@ import java.io.IOException; import java.net.InetAddress; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import com.google.common.util.concurrent.ListeningExecutorService; -import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,28 +34,30 @@ import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; -import org.apache.cassandra.exceptions.RepairException; import org.apache.cassandra.gms.*; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.concurrent.SimpleCondition; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.MerkleTree; +import org.apache.cassandra.utils.Pair; /** * Coordinates the (active) repair of a token range. * * A given RepairSession repairs a set of replicas for a given range on a list * of column families. For each of the column family to repair, RepairSession - * creates a RepairJob that handles the repair of that CF. + * creates a {@link RepairJob} that handles the repair of that CF. * * A given RepairJob has the 2 main phases: - * 1. Validation phase: the job requests merkle trees from each of the replica involves - * (RepairJob.sendTreeRequests()) and waits until all trees are received (in + * <ol> + * <li>Validation phase: the job requests merkle trees from each of the replica involves + * ({@link org.apache.cassandra.repair.ValidationTask}) and waits until all trees are received (in * validationComplete()). - * 2. Synchonization phase: once all trees are received, the job compares each tree with - * all the other using a so-called Differencer (started by submitDifferencers()). If - * differences there is between 2 trees, the concerned Differencer will start a streaming - * of the difference between the 2 endpoint concerned (Differencer.performStreamingRepair). - * The job is done once all its Differencer are done (i.e. have either computed no differences + * </li> + * <li>Synchronization phase: once all trees are received, the job compares each tree with + * all the other using a so-called {@link SyncTask}. If there is difference between 2 trees, the + * concerned SyncTask will start a streaming of the difference between the 2 endpoint concerned. + * </li> + * </ol> + * The job is done once all its SyncTasks are done (i.e. have either computed no differences * or the streaming they started is done (syncComplete())). * * A given session will execute the first phase (validation phase) of each of it's job @@ -71,15 +72,15 @@ import org.apache.cassandra.utils.concurrent.SimpleCondition; * we still first send a message to each node to flush and snapshot data so each merkle tree * creation is still done on similar data, even if the actual creation is not * done simulatneously). If not sequential, all merkle tree are requested in parallel. - * Similarly, if a job is sequential, it will handle one Differencer at a time, but will handle + * Similarly, if a job is sequential, it will handle one SyncTask at a time, but will handle * all of them in parallel otherwise. */ -public class RepairSession extends WrappedRunnable implements IEndpointStateChangeSubscriber, - IFailureDetectionEventListener, - IRepairJobEventListener +public class RepairSession extends AbstractFuture<List<RepairResult>> implements IEndpointStateChangeSubscriber, + IFailureDetectionEventListener { private static Logger logger = LoggerFactory.getLogger(RepairSession.class); + public final UUID parentRepairSession; /** Repair session ID */ private final UUID id; public final String keyspace; @@ -88,25 +89,18 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan /** Range to repair */ public final Range<Token> range; public final Set<InetAddress> endpoints; + private final long repairedAt; - private volatile Exception exception; private final AtomicBoolean isFailed = new AtomicBoolean(false); - private final AtomicBoolean fdUnregistered = new AtomicBoolean(false); - - // First, all RepairJobs are added to this queue, - final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<>(); - // and after receiving all validation, the job is moved to - // this map, keyed by CF name. - final Map<String, RepairJob> syncingJobs = new ConcurrentHashMap<>(); + // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address) + private final ConcurrentMap<Pair<RepairJobDesc, InetAddress>, ValidationTask> validating = new ConcurrentHashMap<>(); + // Remote syncing jobs wait response in syncingTasks map + private final ConcurrentMap<Pair<RepairJobDesc, NodePair>, RemoteSyncTask> syncingTasks = new ConcurrentHashMap<>(); // Tasks(snapshot, validate request, differencing, ...) are run on taskExecutor private final ListeningExecutorService taskExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool(new NamedThreadFactory("RepairJobTask"))); - private final SimpleCondition completed = new SimpleCondition(); - public final Condition differencingDone = new SimpleCondition(); - public final UUID parentRepairSession; - private volatile boolean terminated = false; /** @@ -118,21 +112,25 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan * @param endpoints the data centers that should be part of the repair; null for all DCs * @param cfnames names of columnfamilies */ - public RepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames) + public RepairSession(UUID parentRepairSession, + UUID id, + Range<Token> range, + String keyspace, + boolean isSequential, + Set<InetAddress> endpoints, + long repairedAt, + String... cfnames) { - this(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, cfnames); - } + assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; - public RepairSession(UUID parentRepairSession, UUID id, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String[] cfnames) - { this.parentRepairSession = parentRepairSession; this.id = id; this.isSequential = isSequential; this.keyspace = keyspace; this.cfnames = cfnames; - assert cfnames.length > 0 : "Repairing no column families seems pointless, doesn't it"; this.range = range; this.endpoints = endpoints; + this.repairedAt = repairedAt; } public UUID getId() @@ -145,6 +143,16 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan return range; } + public void waitForValidation(Pair<RepairJobDesc, InetAddress> key, ValidationTask task) + { + validating.put(key, task); + } + + public void waitForSync(Pair<RepairJobDesc, NodePair> key, RemoteSyncTask task) + { + syncingTasks.put(key, task); + } + /** * Receive merkle tree response or failed response from {@code endpoint} for current repair job. * @@ -154,52 +162,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan */ public void validationComplete(RepairJobDesc desc, InetAddress endpoint, MerkleTree tree) { - RepairJob job = jobs.peek(); - if (job == null) + ValidationTask task = validating.remove(Pair.create(desc, endpoint)); + if (task == null) { assert terminated; return; } - if (tree == null) - { - exception = new RepairException(desc, "Validation failed in " + endpoint); - forceShutdown(); - return; - } - logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", getId(), desc.columnFamily, endpoint)); - - assert job.desc.equals(desc); - if (job.addTree(endpoint, tree) == 0) - { - logger.debug("All responses received for {}/{}", getId(), desc.columnFamily); - if (!job.isFailed()) - { - syncingJobs.put(job.desc.columnFamily, job); - job.submitDifferencers(); - } - - // This job is complete, switching to next in line (note that only one thread will ever do this) - jobs.poll(); - RepairJob nextJob = jobs.peek(); - if (nextJob == null) - { - // Unregister from FailureDetector once we've completed synchronizing Merkle trees. - // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down. - // See CASSANDRA-3569 - if (fdUnregistered.compareAndSet(false, true)) - FailureDetector.instance.unregisterFailureDetectionEventListener(this); - - // We are done with this repair session as far as differencing - // is considered. Just inform the session - differencingDone.signalAll(); - } - else - { - nextJob.sendTreeRequests(endpoints); - } - } + task.treeReceived(tree); } /** @@ -211,38 +182,15 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan */ public void syncComplete(RepairJobDesc desc, NodePair nodes, boolean success) { - RepairJob job = syncingJobs.get(desc.columnFamily); - if (job == null) + RemoteSyncTask task = syncingTasks.get(Pair.create(desc, nodes)); + if (task == null) { assert terminated; return; } - if (!success) - { - exception = new RepairException(desc, String.format("Sync failed between %s and %s", nodes.endpoint1, nodes.endpoint2)); - forceShutdown(); - return; - } - logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", getId(), nodes.endpoint1, nodes.endpoint2, desc.columnFamily)); - - if (job.completedSynchronization()) - { - RepairJob completedJob = syncingJobs.remove(job.desc.columnFamily); - String remaining = syncingJobs.size() == 0 ? "" : String.format(" (%d remaining table to sync for this session)", syncingJobs.size()); - if (completedJob != null && completedJob.isFailed()) - logger.warn(String.format("[repair #%s] %s sync failed%s", getId(), desc.columnFamily, remaining)); - else - logger.info(String.format("[repair #%s] %s is fully synced%s", getId(), desc.columnFamily, remaining)); - - if (jobs.isEmpty() && syncingJobs.isEmpty()) - { - taskExecutor.shutdown(); - // this repair session is completed - completed.signalAll(); - } - } + task.syncComplete(success); } private String repairedNodes() @@ -254,15 +202,25 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan return sb.toString(); } - // we don't care about the return value but care about it throwing exception - public void runMayThrow() throws Exception + /** + * Start RepairJob on given ColumnFamilies. + * + * This first validates if all replica are available, and if they are, + * creates RepairJobs and submit to run on given executor. + * + * @param executor Executor to run validation + */ + public void start(ListeningExecutorService executor) { + if (terminated) + return; + logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getId(), repairedNodes(), range, keyspace, Arrays.toString(cfnames))); if (endpoints.isEmpty()) { - differencingDone.signalAll(); logger.info(String.format("[repair #%s] No neighbors to repair with on range %s: session completed", getId(), range)); + set(Lists.<RepairResult>newArrayList()); return; } @@ -272,85 +230,59 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan if (!FailureDetector.instance.isAlive(endpoint)) { String message = String.format("Cannot proceed on repair because a neighbor (%s) is dead: session failed", endpoint); - differencingDone.signalAll(); logger.error("[repair #{}] {}", getId(), message); - throw new IOException(message); + setException(new IOException(message)); + return; } } - ActiveRepairService.instance.addToActiveSessions(this); - try + // Create and submit RepairJob for each ColumnFamily + List<ListenableFuture<RepairResult>> jobs = new ArrayList<>(cfnames.length); + for (String cfname : cfnames) { - // Create and queue a RepairJob for each column family - for (String cfname : cfnames) - { - RepairJob job = new RepairJob(this, parentRepairSession, id, keyspace, cfname, range, isSequential, taskExecutor); - jobs.offer(job); - } - logger.debug("Sending tree requests to endpoints {}", endpoints); - jobs.peek().sendTreeRequests(endpoints); - - // block whatever thread started this session until all requests have been returned: - // if this thread dies, the session will still complete in the background - completed.await(); + RepairJob job = new RepairJob(this, cfname, isSequential, repairedAt, taskExecutor); + executor.execute(job); + jobs.add(job); + } - if (exception == null) + // When all RepairJobs are done without error, cleanup and set the final result + Futures.addCallback(Futures.allAsList(jobs), new FutureCallback<List<RepairResult>>() + { + public void onSuccess(List<RepairResult> results) { + // this repair session is completed logger.info(String.format("[repair #%s] session completed successfully", getId())); + set(results); + taskExecutor.shutdown(); + // mark this session as terminated + terminate(); } - else + + public void onFailure(Throwable t) { - logger.error(String.format("[repair #%s] session completed with the following error", getId()), exception); - throw exception; + logger.error("Repair job failed", t); + setException(t); } - } - catch (InterruptedException e) - { - throw new RuntimeException("Interrupted while waiting for repair."); - } - finally - { - // mark this session as terminated - terminate(); - - ActiveRepairService.instance.removeFromActiveSessions(this); - - // If we've reached here in an exception state without completing Merkle Tree sync, we'll still be registered - // with the FailureDetector. - if (fdUnregistered.compareAndSet(false, true)) - FailureDetector.instance.unregisterFailureDetectionEventListener(this); - } + }); } public void terminate() { terminated = true; - jobs.clear(); - syncingJobs.clear(); + validating.clear(); + syncingTasks.clear(); } /** * clear all RepairJobs and terminate this session. + * + * @param reason Cause of error for shutdown */ - public void forceShutdown() + public void forceShutdown(Throwable reason) { + setException(reason); taskExecutor.shutdownNow(); - differencingDone.signalAll(); - completed.signalAll(); - } - - public void failedSnapshot() - { - exception = new IOException("Failed during snapshot creation."); - forceShutdown(); - } - - void failedNode(InetAddress remote) - { - String errorMsg = String.format("Endpoint %s died", remote); - exception = new IOException(errorMsg); - // If a node failed during Merkle creation, we stop everything (though there could still be some activity in the background) - forceShutdown(); + terminate(); } public void onJoin(InetAddress endpoint, EndpointState epState) {} @@ -383,6 +315,9 @@ public class RepairSession extends WrappedRunnable implements IEndpointStateChan if (!isFailed.compareAndSet(false, true)) return; - failedNode(endpoint); + Exception exception = new IOException(String.format("Endpoint %s died", endpoint)); + logger.error(String.format("[repair #%s] session completed with the following error", getId()), exception); + // If a node failed, we stop everything (though there could still be some activity in the background) + forceShutdown(exception); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/RequestCoordinator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/RequestCoordinator.java b/src/java/org/apache/cassandra/repair/RequestCoordinator.java deleted file mode 100644 index ed089ef..0000000 --- a/src/java/org/apache/cassandra/repair/RequestCoordinator.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Queue; -import java.util.Set; - -/** -*/ -public abstract class RequestCoordinator<R> -{ - private final Order<R> orderer; - - public RequestCoordinator(boolean isSequential) - { - this.orderer = isSequential ? new SequentialOrder(this) : new ParallelOrder(this); - } - - public abstract void send(R request); - - public void add(R request) - { - orderer.add(request); - } - - public void start() - { - orderer.start(); - } - - // Returns how many request remains - public int completed(R request) - { - return orderer.completed(request); - } - - private static abstract class Order<R> - { - protected final RequestCoordinator<R> coordinator; - - Order(RequestCoordinator<R> coordinator) - { - this.coordinator = coordinator; - } - - public abstract void add(R request); - public abstract void start(); - public abstract int completed(R request); - } - - private static class SequentialOrder<R> extends Order<R> - { - private final Queue<R> requests = new LinkedList<>(); - - SequentialOrder(RequestCoordinator<R> coordinator) - { - super(coordinator); - } - - public void add(R request) - { - requests.add(request); - } - - public void start() - { - if (requests.isEmpty()) - return; - - coordinator.send(requests.peek()); - } - - public int completed(R request) - { - assert request.equals(requests.peek()); - requests.poll(); - int remaining = requests.size(); - if (remaining != 0) - coordinator.send(requests.peek()); - return remaining; - } - } - - private static class ParallelOrder<R> extends Order<R> - { - private final Set<R> requests = new HashSet<>(); - - ParallelOrder(RequestCoordinator<R> coordinator) - { - super(coordinator); - } - - public void add(R request) - { - requests.add(request); - } - - public void start() - { - for (R request : requests) - coordinator.send(request); - } - - public int completed(R request) - { - requests.remove(request); - return requests.size(); - } - } - -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SnapshotTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SnapshotTask.java b/src/java/org/apache/cassandra/repair/SnapshotTask.java index 6c3afb1..a87643c 100644 --- a/src/java/org/apache/cassandra/repair/SnapshotTask.java +++ b/src/java/org/apache/cassandra/repair/SnapshotTask.java @@ -74,6 +74,7 @@ public class SnapshotTask extends AbstractFuture<InetAddress> implements Runnabl public void onFailure(InetAddress from) { + //listener.failedSnapshot(); task.setException(new RuntimeException("Could not create snapshot at " + from)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/StreamingRepairTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java index 9af949d..f30eb6f 100644 --- a/src/java/org/apache/cassandra/repair/StreamingRepairTask.java +++ b/src/java/org/apache/cassandra/repair/StreamingRepairTask.java @@ -23,59 +23,40 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.repair.messages.SyncComplete; import org.apache.cassandra.repair.messages.SyncRequest; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.streaming.*; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.streaming.StreamEvent; +import org.apache.cassandra.streaming.StreamEventHandler; +import org.apache.cassandra.streaming.StreamPlan; +import org.apache.cassandra.streaming.StreamState; /** - * Task that make two nodes exchange (stream) some ranges (for a given table/cf). - * This handle the case where the local node is neither of the two nodes that - * must stream their range, and allow to register a callback to be called on - * completion. + * StreamingRepairTask performs data streaming between two remote replica which neither is not repair coordinator. + * Task will send {@link SyncComplete} message back to coordinator upon streaming completion. */ public class StreamingRepairTask implements Runnable, StreamEventHandler { private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class); - /** Repair session ID that this streaming task belongs */ - public final RepairJobDesc desc; - public final SyncRequest request; + private final RepairJobDesc desc; + private final SyncRequest request; + private final long repairedAt; - public StreamingRepairTask(RepairJobDesc desc, SyncRequest request) + public StreamingRepairTask(RepairJobDesc desc, SyncRequest request, long repairedAt) { this.desc = desc; this.request = request; + this.repairedAt = repairedAt; } public void run() { - if (request.src.equals(FBUtilities.getBroadcastAddress())) - initiateStreaming(); - else - forwardToSource(); - } - - private void initiateStreaming() - { - long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; - if (desc.parentSessionId != null && ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId) != null) - repairedAt = ActiveRepairService.instance.getParentRepairSession(desc.parentSessionId).repairedAt; - logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", desc.sessionId, request.ranges.size(), request.dst)); - StreamResultFuture op = new StreamPlan("Repair", repairedAt, 1) - .flushBeforeTransfer(true) - // request ranges from the remote node - .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) - // send ranges to the remote node - .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) - .execute(); - op.addEventListener(this); - } - - private void forwardToSource() - { - logger.info(String.format("[repair #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", desc.sessionId, request.ranges.size(), request.src, request.dst)); - MessagingService.instance().sendOneWay(request.createMessage(), request.src); + new StreamPlan("Repair", repairedAt, 1).listeners(this) + .flushBeforeTransfer(true) + // request ranges from the remote node + .requestRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + // send ranges to the remote node + .transferRanges(request.dst, desc.keyspace, request.ranges, desc.columnFamily) + .execute(); } public void handleStreamEvent(StreamEvent event) @@ -85,7 +66,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler } /** - * If we succeeded on both stream in and out, reply back to the initiator. + * If we succeeded on both stream in and out, reply back to coordinator */ public void onSuccess(StreamState state) { @@ -94,7 +75,7 @@ public class StreamingRepairTask implements Runnable, StreamEventHandler } /** - * If we failed on either stream in or out, reply fail to the initiator. + * If we failed on either stream in or out, reply fail to coordinator */ public void onFailure(Throwable t) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SyncStat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncStat.java b/src/java/org/apache/cassandra/repair/SyncStat.java new file mode 100644 index 0000000..5721a20 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SyncStat.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +/** + * Statistics about synchronizing two replica + */ +public class SyncStat +{ + public final NodePair nodes; + public final long numberOfDifferences; + + public SyncStat(NodePair nodes, long numberOfDifferences) + { + this.nodes = nodes; + this.numberOfDifferences = numberOfDifferences; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/SyncTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/SyncTask.java b/src/java/org/apache/cassandra/repair/SyncTask.java new file mode 100644 index 0000000..3ce5532 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/SyncTask.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.util.ArrayList; +import java.util.List; + +import com.google.common.util.concurrent.AbstractFuture; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.MerkleTree; + +/** + * SyncTask will calculate the difference of MerkleTree between two nodes + * and perform necessary operation to repair replica. + */ +public abstract class SyncTask extends AbstractFuture<SyncStat> implements Runnable +{ + private static Logger logger = LoggerFactory.getLogger(SyncTask.class); + + protected final RepairJobDesc desc; + protected final TreeResponse r1; + protected final TreeResponse r2; + + protected volatile SyncStat stat; + + public SyncTask(RepairJobDesc desc, TreeResponse r1, TreeResponse r2) + { + this.desc = desc; + this.r1 = r1; + this.r2 = r2; + } + + /** + * Compares trees, and triggers repairs for any ranges that mismatch. + */ + public void run() + { + // compare trees, and collect differences + List<Range<Token>> differences = new ArrayList<>(); + differences.addAll(MerkleTree.difference(r1.tree, r2.tree)); + + stat = new SyncStat(new NodePair(r1.endpoint, r2.endpoint), differences.size()); + + // choose a repair method based on the significance of the difference + String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", desc.sessionId, r1.endpoint, r2.endpoint, desc.columnFamily); + if (differences.isEmpty()) + { + logger.info(String.format(format, "are consistent")); + set(stat); + return; + } + + // non-0 difference: perform streaming repair + logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); + startSync(differences); + } + + public SyncStat getCurrentStat() + { + return stat; + } + + protected abstract void startSync(List<Range<Token>> differences); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/ValidationTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/ValidationTask.java b/src/java/org/apache/cassandra/repair/ValidationTask.java new file mode 100644 index 0000000..a52ec4f --- /dev/null +++ b/src/java/org/apache/cassandra/repair/ValidationTask.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; + +import com.google.common.util.concurrent.AbstractFuture; + +import org.apache.cassandra.exceptions.RepairException; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.repair.messages.ValidationRequest; +import org.apache.cassandra.utils.MerkleTree; + +/** + * ValidationTask sends {@link ValidationRequest} to a replica. + * When a replica sends back message, task completes. + */ +public class ValidationTask extends AbstractFuture<TreeResponse> implements Runnable +{ + private final RepairJobDesc desc; + private final InetAddress endpoint; + private final int gcBefore; + + public ValidationTask(RepairJobDesc desc, InetAddress endpoint, int gcBefore) + { + this.desc = desc; + this.endpoint = endpoint; + this.gcBefore = gcBefore; + } + + /** + * Send ValidationRequest to replica + */ + public void run() + { + ValidationRequest request = new ValidationRequest(desc, gcBefore); + MessagingService.instance().sendOneWay(request.createMessage(), endpoint); + } + + /** + * Receive MerkleTree from replica node. + * + * @param tree MerkleTree that is sent from replica. Null if validation failed on replica node. + */ + public void treeReceived(MerkleTree tree) + { + if (tree == null) + { + setException(new RepairException(desc, "Validation failed in " + endpoint)); + } + else + { + set(new TreeResponse(endpoint, tree)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/repair/messages/RepairOption.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/repair/messages/RepairOption.java b/src/java/org/apache/cassandra/repair/messages/RepairOption.java new file mode 100644 index 0000000..ca02365 --- /dev/null +++ b/src/java/org/apache/cassandra/repair/messages/RepairOption.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Repair options. + */ +public class RepairOption +{ + public static final String SEQUENTIAL_KEY = "sequential"; + public static final String PRIMARY_RANGE_KEY = "primaryRange"; + public static final String INCREMENTAL_KEY = "incremental"; + public static final String JOB_THREADS_KEY = "jobThreads"; + public static final String RANGES_KEY = "ranges"; + public static final String COLUMNFAMILIES_KEY = "columnFamilies"; + public static final String DATACENTERS_KEY = "dataCenters"; + public static final String HOSTS_KEY = "hosts"; + + // we don't want to push nodes too much for repair + public static final int MAX_JOB_THREADS = 4; + + private static final Logger logger = LoggerFactory.getLogger(RepairOption.class); + + /** + * Construct RepairOptions object from given map of Strings. + * <p> + * Available options are: + * + * <table> + * <thead> + * <tr> + * <th>key</th> + * <th>value</th> + * <th>default (when key not given)</th> + * </tr> + * </thead> + * <tbody> + * <tr> + * <td>sequential</td> + * <td>"true" if perform sequential repair.</td> + * <td>true</td> + * </tr> + * <tr> + * <td>primaryRange</td> + * <td>"true" if perform repair only on primary range.</td> + * <td>false</td> + * </tr> + * <tr> + * <td>incremental</td> + * <td>"true" if perform incremental repair.</td> + * <td>false</td> + * </tr> + * <tr> + * <td>jobThreads</td> + * <td>Number of threads to use to run repair job.</td> + * <td>1</td> + * </tr> + * <tr> + * <td>ranges</td> + * <td>Ranges to repair. A range is expressed as <start token>:<end token> + * and multiple ranges can be given as comma separated ranges(e.g. aaa:bbb,ccc:ddd).</td> + * <td></td> + * </tr> + * <tr> + * <td>columnFamilies</td> + * <td>Specify names of ColumnFamilies to repair. + * Multiple ColumnFamilies can be given as comma separated values(e.g. cf1,cf2,cf3).</td> + * <td></td> + * </tr> + * <tr> + * <td>dataCenters</td> + * <td>Specify names of data centers who participate in this repair. + * Multiple data centers can be given as comma separated values(e.g. dc1,dc2,dc3).</td> + * <td></td> + * </tr> + * <tr> + * <td>hosts</td> + * <td>Specify names of hosts who participate in this repair. + * Multiple hosts can be given as comma separated values(e.g. cass1,cass2).</td> + * <td></td> + * </tr> + * </tbody> + * </table> + * + * @param options options to parse + * @param partitioner partitioner is used to construct token ranges + * @return RepairOptions object + */ + public static RepairOption parse(Map<String, String> options, IPartitioner partitioner) + { + boolean sequential = !options.containsKey(SEQUENTIAL_KEY) || Boolean.parseBoolean(options.get(SEQUENTIAL_KEY)); + boolean primaryRange = Boolean.parseBoolean(options.get(PRIMARY_RANGE_KEY)); + boolean incremental = Boolean.parseBoolean(options.get(INCREMENTAL_KEY)); + + int jobThreads = 1; + if (options.containsKey(JOB_THREADS_KEY)) + { + try + { + jobThreads = Integer.parseInt(options.get(JOB_THREADS_KEY)); + } + catch (NumberFormatException ignore) {} + } + // ranges + String rangesStr = options.get(RANGES_KEY); + Set<Range<Token>> ranges = new HashSet<>(); + if (rangesStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(rangesStr, ","); + while (tokenizer.hasMoreTokens()) + { + String[] rangeStr = tokenizer.nextToken().split(":", 2); + if (rangeStr.length < 2) + { + continue; + } + Token parsedBeginToken = partitioner.getTokenFactory().fromString(rangeStr[0].trim()); + Token parsedEndToken = partitioner.getTokenFactory().fromString(rangeStr[1].trim()); + ranges.add(new Range<>(parsedBeginToken, parsedEndToken)); + } + } + + RepairOption option = new RepairOption(sequential, primaryRange, incremental, jobThreads, ranges); + + // data centers + String dataCentersStr = options.get(DATACENTERS_KEY); + Collection<String> dataCenters = new HashSet<>(); + if (dataCentersStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(dataCentersStr, ","); + while (tokenizer.hasMoreTokens()) + { + dataCenters.add(tokenizer.nextToken().trim()); + } + } + option.getDataCenters().addAll(dataCenters); + + // hosts + String hostsStr = options.get(HOSTS_KEY); + Collection<String> hosts = new HashSet<>(); + if (hostsStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(hostsStr, ","); + while (tokenizer.hasMoreTokens()) + { + hosts.add(tokenizer.nextToken().trim()); + } + } + option.getHosts().addAll(hosts); + + // columnfamilies + String cfStr = options.get(COLUMNFAMILIES_KEY); + Collection<String> columnFamilies = new HashSet<>(); + if (cfStr != null) + { + StringTokenizer tokenizer = new StringTokenizer(cfStr, ","); + while (tokenizer.hasMoreTokens()) + { + columnFamilies.add(tokenizer.nextToken().trim()); + } + } + option.getColumnFamilies().addAll(columnFamilies); + + // validate options + if (jobThreads > MAX_JOB_THREADS) + { + throw new IllegalArgumentException("Too many job threads. Max is " + MAX_JOB_THREADS); + } + if (primaryRange && (!dataCenters.isEmpty() || !hosts.isEmpty())) + { + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + } + + return option; + } + + private final boolean sequential; + private final boolean primaryRange; + private final boolean incremental; + private final int jobThreads; + + private final Collection<String> columnFamilies = new HashSet<>(); + private final Collection<String> dataCenters = new HashSet<>(); + private final Collection<String> hosts = new HashSet<>(); + private final Collection<Range<Token>> ranges = new HashSet<>(); + + public RepairOption(boolean sequential, boolean primaryRange, boolean incremental, int jobThreads, Collection<Range<Token>> ranges) + { + if (sequential && incremental) + { + String message = "It is not possible to mix sequential repair and incremental repairs."; + logger.error(message); + throw new IllegalArgumentException(message); + } + + if (!FBUtilities.isUnix() && sequential) + { + logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); + this.sequential = false; + } + else + { + this.sequential = sequential; + } + this.primaryRange = primaryRange; + this.incremental = incremental; + this.jobThreads = jobThreads; + this.ranges.addAll(ranges); + } + + public boolean isSequential() + { + return sequential; + } + + public boolean isPrimaryRange() + { + return primaryRange; + } + + public boolean isIncremental() + { + return incremental; + } + + public int getJobThreads() + { + return jobThreads; + } + + public Collection<String> getColumnFamilies() + { + return columnFamilies; + } + + public Collection<Range<Token>> getRanges() + { + return ranges; + } + + public Collection<String> getDataCenters() + { + return dataCenters; + } + + public Collection<String> getHosts() + { + return hosts; + } + + @Override + public String toString() + { + return "repair options (" + + "sequential: " + sequential + + ", primary range: " + primaryRange + + ", incremental: " + incremental + + ", job threads: " + jobThreads + + ", ColumnFamilies: " + columnFamilies + + ", dataCenters: " + dataCenters + + ", hosts: " + hosts + + ", # of ranges: " + ranges.size() + + ')'; + } +}
