Author: slebresne Date: Fri Sep 2 10:24:21 2011 New Revision: 1164463 URL: http://svn.apache.org/viewvc?rev=1164463&view=rev Log: Make repair of a range sync all replica pairs for this range patch by slebresne; reviewed by jbellis for CASSANDRA-2610
Added: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java Modified: cassandra/trunk/CHANGES.txt cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Modified: cassandra/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/cassandra/trunk/CHANGES.txt?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/CHANGES.txt (original) +++ cassandra/trunk/CHANGES.txt Fri Sep 2 10:24:21 2011 @@ -55,6 +55,7 @@ to be down before starting (CASSANDRA-2034) * Make the compression algorithm and chunk length configurable (CASSANDRA-3001) * Add throttling for internode streaming (CASSANDRA-3080) + * make the repair of a range repair all replica (CASSANDRA-2610) 0.8.5 * fix NPE when encryption_options is unspecified (CASSANDRA-3007) Modified: cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/net/MessagingService.java Fri Sep 2 10:24:21 2011 @@ -63,9 +63,11 @@ public final class MessagingService impl { public static final String MBEAN_NAME = "org.apache.cassandra.net:type=MessagingService"; + // 8 bits version, so don't waste versions public static final int VERSION_07 = 1; public static final int VERSION_080 = 2; - public static final int version_ = 3; // 8 bits, so don't waste versions + public static final int VERSION_10 = 3; + public static final int version_ = VERSION_10; static SerializerType serializerType_ = SerializerType.BINARY; Modified: cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/AntiEntropyService.java Fri Sep 2 10:24:21 2011 @@ -24,7 +24,6 @@ import java.security.MessageDigest; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import com.google.common.base.Objects; @@ -44,17 +43,13 @@ import org.apache.cassandra.dht.RandomPa import org.apache.cassandra.dht.Range; import org.apache.cassandra.gms.*; import org.apache.cassandra.io.ICompactSerializer; -import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.util.FastByteArrayInputStream; import org.apache.cassandra.io.util.FastByteArrayOutputStream; import org.apache.cassandra.net.CompactEndpointSerializationHelper; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.streaming.OperationType; -import org.apache.cassandra.streaming.StreamIn; -import org.apache.cassandra.streaming.StreamOut; -import org.apache.cassandra.streaming.StreamOutSession; +import org.apache.cassandra.streaming.*; import org.apache.cassandra.utils.*; /** @@ -130,6 +125,8 @@ public class AntiEntropyService return futureTask; } + // for testing only. Create a session corresponding to a fake request and + // add it to the sessions (avoid NPE in tests) RepairFuture submitArtificialRepairSession(TreeRequest req, String tablename, String... cfnames) { RepairFuture futureTask = new RepairSession(req, tablename, cfnames).getFuture(); @@ -171,6 +168,8 @@ public class AntiEntropyService RepairSession session = sessions.get(request.sessionid); assert session != null; + logger.info(String.format("[repair #%s] Received merkle tree for %s from %s", session.getName(), request.cf.right, request.endpoint)); + RepairSession.RepairJob job = session.jobs.peek(); assert job != null : "A repair should have at least some jobs scheduled"; @@ -214,12 +213,13 @@ public class AntiEntropyService try { Message message = TreeResponseVerbHandler.makeVerb(local, validator); - logger.info("Sending AEService tree for " + validator.request); + if (!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress())) + logger.info(String.format("[repair #%s] Sending completed merkle tree to %s for %s", validator.request.sessionid, validator.request.endpoint, validator.request.cf)); ms.sendOneWay(message, validator.request.endpoint); } catch (Exception e) { - logger.error("Could not send valid tree for request " + validator.request, e); + logger.error(String.format("[repair #%s] Error sending completed merkle tree to %s for %s ", validator.request.sessionid, validator.request.endpoint, validator.request.cf), e); } } @@ -534,17 +534,6 @@ public class AntiEntropyService } /** - * A tuple of a local and remote tree. - */ - static class TreePair extends Pair<MerkleTree,MerkleTree> - { - public TreePair(MerkleTree local, MerkleTree remote) - { - super(local, remote); - } - } - - /** * A tuple of table, cf, address and range that represents a location we have an outstanding TreeRequest for. */ public static class TreeRequest @@ -613,7 +602,7 @@ public class AntiEntropyService public RepairSession(Range range, String tablename, String... cfnames) { - this("manual-repair-" + UUID.randomUUID(), range, tablename, cfnames); + this(UUIDGen.makeType1UUIDFromHost(FBUtilities.getBroadcastAddress()).toString(), range, tablename, cfnames); } private RepairSession(String id, Range range, String tablename, String[] cfnames) @@ -636,13 +625,24 @@ public class AntiEntropyService return new RepairFuture(this); } + private String repairedNodes() + { + StringBuilder sb = new StringBuilder(); + sb.append(FBUtilities.getBroadcastAddress()); + for (InetAddress ep : endpoints) + sb.append(", ").append(ep); + return sb.toString(); + } + // we don't care about the return value but care about it throwing exception public void runMayThrow() throws Exception { + logger.info(String.format("[repair #%s] new session: will sync %s on range %s for %s.%s", getName(), repairedNodes(), range, tablename, Arrays.toString(cfnames))); + if (endpoints.isEmpty()) { differencingDone.signalAll(); - logger.info("No neighbors to repair with for " + tablename + " on " + range + ": " + getName() + " completed."); + logger.info("[repair #%s] No neighbors to repair with on range %s: session completed", getName(), range); return; } @@ -652,7 +652,7 @@ public class AntiEntropyService if (!FailureDetector.instance.isAlive(endpoint)) { differencingDone.signalAll(); - logger.info("Could not proceed on repair because a neighbor (" + endpoint + ") is dead: " + getName() + " failed."); + logger.info("[repair #%s] Could not proceed on repair because a neighbor (%s) is dead: session failed", getName(), endpoint); return; } } @@ -675,8 +675,15 @@ public class AntiEntropyService // block whatever thread started this session until all requests have been returned: // if this thread dies, the session will still complete in the background completed.await(); - if (exception != null) + if (exception == null) + { + logger.info(String.format("[repair #%s] session completed successfully", getName())); + } + else + { + logger.error(String.format("[repair #%s] session completed with the following error", getName()), exception); throw exception; + } } catch (InterruptedException e) { @@ -690,13 +697,19 @@ public class AntiEntropyService } } - void completed(InetAddress remote, String cfname) + void completed(Differencer differencer) { - logger.debug("Repair completed for {} on {}", remote, cfname); - RepairJob job = activeJobs.get(cfname); - if (job.completedSynchronizationJob(remote)) - { - activeJobs.remove(cfname); + logger.debug(String.format("[repair #%s] Repair completed between %s and %s on %s", + getName(), + differencer.r1.endpoint, + differencer.r2.endpoint, + differencer.cfname)); + RepairJob job = activeJobs.get(differencer.cfname); + if (job.completedSynchronization(differencer)) + { + activeJobs.remove(differencer.cfname); + String remaining = activeJobs.size() == 0 ? "" : String.format(" (%d remaining column family to sync for this session)", activeJobs.size()); + logger.info(String.format("[repair #%s] %s is fully synced%s", getName(), differencer.cfname, remaining)); if (activeJobs.isEmpty()) completed.signalAll(); } @@ -704,8 +717,7 @@ public class AntiEntropyService void failedNode(InetAddress remote) { - String errorMsg = String.format("Problem during repair session %s, endpoint %s died", sessionName, remote); - logger.error(errorMsg); + String errorMsg = String.format("Endpoint %s died", remote); exception = new IOException(errorMsg); // If a node failed, we stop everything (though there could still be some activity in the background) jobs.clear(); @@ -749,9 +761,13 @@ public class AntiEntropyService class RepairJob { private final String cfname; - private final Set<InetAddress> requestedEndpoints = new HashSet<InetAddress>(); - private final Map<InetAddress, MerkleTree> trees = new HashMap<InetAddress, MerkleTree>(); - private final Set<InetAddress> syncJobs = new HashSet<InetAddress>(); + // first we send tree requests. this tracks the endpoints remaining to hear from + private final Set<InetAddress> remainingEndpoints = new HashSet<InetAddress>(); + // tree responses are then tracked here + private final List<TreeResponse> trees = new ArrayList<TreeResponse>(endpoints.size() + 1); + // once all responses are received, each tree is compared with each other, and differencer tasks + // are submitted. the job is done when all differencers are complete. + private final Set<Differencer> remainingDifferencers = new HashSet<Differencer>(); public RepairJob(String cfname) { @@ -763,24 +779,28 @@ public class AntiEntropyService */ public void sendTreeRequests() { - requestedEndpoints.addAll(endpoints); - requestedEndpoints.add(FBUtilities.getBroadcastAddress()); + remainingEndpoints.addAll(endpoints); + remainingEndpoints.add(FBUtilities.getBroadcastAddress()); // send requests to all nodes - for (InetAddress endpoint : requestedEndpoints) + for (InetAddress endpoint : remainingEndpoints) AntiEntropyService.instance.request(getName(), endpoint, range, tablename, cfname); + + logger.info(String.format("[repair #%s] requests for merkle tree sent for %s (to %s)", getName(), cfname, remainingEndpoints)); } /** * Add a new received tree and return the number of remaining tree to * be received for the job to be complete. + * + * Callers may assume exactly one addTree call will result in zero remaining endpoints. */ public synchronized int addTree(TreeRequest request, MerkleTree tree) { assert request.cf.right.equals(cfname); - trees.put(request.endpoint, tree); - requestedEndpoints.remove(request.endpoint); - return requestedEndpoints.size(); + trees.add(new TreeResponse(request.endpoint, tree)); + remainingEndpoints.remove(request.endpoint); + return remainingEndpoints.size(); } /** @@ -789,30 +809,31 @@ public class AntiEntropyService */ public void submitDifferencers() { - assert requestedEndpoints.size() == 0; + assert remainingEndpoints.isEmpty(); - // Right now, we only difference local host against each other. CASSANDRA-2610 will fix that. - // In the meantime ugly special casing will work good enough. - MerkleTree localTree = - trees.get(FBUtilities.getBroadcastAddress()); - assert localTree != null; - for (Map.Entry<InetAddress, MerkleTree> entry : trees.entrySet()) + // We need to difference all trees one against another + for (int i = 0; i < trees.size() - 1; ++i) { - if (entry.getKey().equals(FBUtilities.getBroadcastAddress())) - continue; - - Differencer differencer = new Differencer(cfname, entry.getKey(), entry.getValue(), localTree); - syncJobs.add(entry.getKey()); - logger.debug("Queueing comparison " + differencer); - StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer); + TreeResponse r1 = trees.get(i); + for (int j = i + 1; j < trees.size(); ++j) + { + TreeResponse r2 = trees.get(j); + Differencer differencer = new Differencer(cfname, r1, r2); + logger.debug("Queueing comparison {}", differencer); + remainingDifferencers.add(differencer); + StageManager.getStage(Stage.ANTI_ENTROPY).execute(differencer); + } } trees.clear(); // allows gc to do its thing } - synchronized boolean completedSynchronizationJob(InetAddress remote) + /** + * @return true if the @param differencer was the last remaining + */ + synchronized boolean completedSynchronization(Differencer differencer) { - syncJobs.remove(remote); - return syncJobs.isEmpty(); + remainingDifferencers.remove(differencer); + return remainingDifferencers.isEmpty(); } } @@ -822,17 +843,15 @@ public class AntiEntropyService class Differencer implements Runnable { public final String cfname; - public final InetAddress remote; - public final MerkleTree ltree; - public final MerkleTree rtree; + public final TreeResponse r1; + public final TreeResponse r2; public List<Range> differences; - Differencer(String cfname, InetAddress remote, MerkleTree ltree, MerkleTree rtree) + Differencer(String cfname, TreeResponse r1, TreeResponse r2) { this.cfname = cfname; - this.remote = remote; - this.ltree = ltree; - this.rtree = rtree; + this.r1 = r1; + this.r2 = r2; this.differences = new ArrayList<Range>(); } @@ -841,88 +860,65 @@ public class AntiEntropyService */ public void run() { - InetAddress local = FBUtilities.getBroadcastAddress(); - // restore partitioners (in case we were serialized) - if (ltree.partitioner() == null) - ltree.partitioner(StorageService.getPartitioner()); - if (rtree.partitioner() == null) - rtree.partitioner(StorageService.getPartitioner()); + if (r1.tree.partitioner() == null) + r1.tree.partitioner(StorageService.getPartitioner()); + if (r2.tree.partitioner() == null) + r2.tree.partitioner(StorageService.getPartitioner()); // compare trees, and collect differences - differences.addAll(MerkleTree.difference(ltree, rtree)); + differences.addAll(MerkleTree.difference(r1.tree, r2.tree)); // choose a repair method based on the significance of the difference - String format = "Endpoints " + local + " and " + remote + " %s for " + cfname + " on " + range; + String format = String.format("[repair #%s] Endpoints %s and %s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname); if (differences.isEmpty()) { logger.info(String.format(format, "are consistent")); - completed(remote, cfname); + completed(this); return; } // non-0 difference: perform streaming repair logger.info(String.format(format, "have " + differences.size() + " range(s) out of sync")); - try - { - performStreamingRepair(); - } - catch(IOException e) - { - throw new RuntimeException(e); - } + performStreamingRepair(); } /** * Starts sending/receiving our list of differences to/from the remote endpoint: creates a callback * that will be called out of band once the streams complete. */ - void performStreamingRepair() throws IOException + void performStreamingRepair() { - logger.info("Performing streaming repair of " + differences.size() + " ranges with " + remote + " for " + range); - ColumnFamilyStore cfstore = Table.open(tablename).getColumnFamilyStore(cfname); - try - { - // We acquire references for transferSSTables - Collection<SSTableReader> sstables = cfstore.markCurrentSSTablesReferenced(); - Callback callback = new Callback(); - // send ranges to the remote node - StreamOutSession outsession = StreamOutSession.create(tablename, remote, callback); - StreamOut.transferSSTables(outsession, sstables, differences, OperationType.AES); - // request ranges from the remote node - StreamIn.requestRanges(remote, tablename, differences, callback, OperationType.AES); - } - catch(Exception e) + Runnable callback = new Runnable() { - throw new IOException("Streaming repair failed.", e); - } + public void run() + { + completed(Differencer.this); + } + }; + StreamingRepairTask task = StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname, differences, callback); + + // Pre 1.0, nodes don't know how to handle forwarded streaming task so don't bother + if (task.isLocalTask() || Gossiper.instance.getVersion(task.dst) >= MessagingService.VERSION_10) + task.run(); } public String toString() { - return "#<Differencer " + remote + "/" + range + ">"; + return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint + "/" + range + ">"; } + } + } - /** - * When a repair is necessary, this callback is created to wait for the inbound - * and outbound streams to complete. - */ - class Callback extends WrappedRunnable - { - // we expect one callback for the receive, and one for the send - private final AtomicInteger outstanding = new AtomicInteger(2); + static class TreeResponse + { + public final InetAddress endpoint; + public final MerkleTree tree; - protected void runMayThrow() throws Exception - { - if (outstanding.decrementAndGet() > 0) - // waiting on more calls - return; - - // all calls finished successfully - completed(remote, cfname); - logger.info(String.format("Finished streaming repair with %s for %s", remote, range)); - } - } + TreeResponse(InetAddress endpoint, MerkleTree tree) + { + this.endpoint = endpoint; + this.tree = tree; } } Modified: cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/service/StorageService.java Fri Sep 2 10:24:21 2011 @@ -107,6 +107,8 @@ public class StorageService implements I REPLICATION_FINISHED, INTERNAL_RESPONSE, // responses to internal calls COUNTER_MUTATION, + STREAMING_REPAIR_REQUEST, + STREAMING_REPAIR_RESPONSE, // use as padding for backwards compatability where a previous version needs to validate a verb from the future. UNUSED_1, UNUSED_2, @@ -129,6 +131,8 @@ public class StorageService implements I put(Verb.BOOTSTRAP_TOKEN, Stage.MISC); put(Verb.TREE_REQUEST, Stage.ANTI_ENTROPY); put(Verb.TREE_RESPONSE, Stage.ANTI_ENTROPY); + put(Verb.STREAMING_REPAIR_REQUEST, Stage.ANTI_ENTROPY); + put(Verb.STREAMING_REPAIR_RESPONSE, Stage.ANTI_ENTROPY); put(Verb.GOSSIP_DIGEST_ACK, Stage.GOSSIP); put(Verb.GOSSIP_DIGEST_ACK2, Stage.GOSSIP); put(Verb.GOSSIP_DIGEST_SYN, Stage.GOSSIP); @@ -245,6 +249,8 @@ public class StorageService implements I MessagingService.instance().registerVerbHandlers(Verb.INTERNAL_RESPONSE, new ResponseVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.TREE_REQUEST, new TreeRequestVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.TREE_RESPONSE, new AntiEntropyService.TreeResponseVerbHandler()); + MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_REQUEST, new StreamingRepairTask.StreamingRepairRequest()); + MessagingService.instance().registerVerbHandlers(Verb.STREAMING_REPAIR_RESPONSE, new StreamingRepairTask.StreamingRepairResponse()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_SYN, new GossipDigestSynVerbHandler()); MessagingService.instance().registerVerbHandlers(Verb.GOSSIP_DIGEST_ACK, new GossipDigestAckVerbHandler()); Added: cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java?rev=1164463&view=auto ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java (added) +++ cassandra/trunk/src/java/org/apache/cassandra/streaming/StreamingRepairTask.java Fri Sep 2 10:24:21 2011 @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.cassandra.streaming; + +import java.io.*; +import java.net.InetAddress; +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Table; +import org.apache.cassandra.dht.AbstractBounds; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.io.ICompactSerializer; +import org.apache.cassandra.io.sstable.SSTableReader; +import org.apache.cassandra.net.*; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.UUIDGen; + +/** + * Task that make two nodes exchange (stream) some ranges (for a given table/cf). + * This handle the case where the local node is neither of the two nodes that + * must stream their range, and allow to register a callback to be called on + * completion. + */ +public class StreamingRepairTask implements Runnable +{ + private static final Logger logger = LoggerFactory.getLogger(StreamingRepairTask.class); + + // maps of tasks created on this node + private static final ConcurrentMap<UUID, StreamingRepairTask> tasks = new ConcurrentHashMap<UUID, StreamingRepairTask>(); + private static final StreamingRepairTaskSerializer serializer = new StreamingRepairTaskSerializer(); + + public final UUID id; + private final InetAddress owner; // the node where the task is created; can be == src but don't need to + public final InetAddress src; + public final InetAddress dst; + + private final String tableName; + private final String cfName; + private final Collection<Range> ranges; + private final Runnable callback; + + private StreamingRepairTask(UUID id, InetAddress owner, InetAddress src, InetAddress dst, String tableName, String cfName, Collection<Range> ranges, Runnable callback) + { + this.id = id; + this.owner = owner; + this.src = src; + this.dst = dst; + this.tableName = tableName; + this.cfName = cfName; + this.ranges = ranges; + this.callback = callback; + } + + public static StreamingRepairTask create(InetAddress ep1, InetAddress ep2, String tableName, String cfName, Collection<Range> ranges, Runnable callback) + { + InetAddress local = FBUtilities.getLocalAddress(); + UUID id = UUIDGen.makeType1UUIDFromHost(local); + // We can take anyone of the node as source or destination, however if one is localhost, we put at source to avoid a forwarding + InetAddress src = ep2.equals(local) ? ep2 : ep1; + InetAddress dst = ep2.equals(local) ? ep1 : ep2; + StreamingRepairTask task = new StreamingRepairTask(id, local, src, dst, tableName, cfName, ranges, wrapCallback(callback, id, local.equals(src))); + tasks.put(id, task); + return task; + } + + /** + * Returns true if the task if the task can be executed locally, false if + * it has to be forwarded. + */ + public boolean isLocalTask() + { + return owner.equals(src); + } + + public void run() + { + if (src.equals(FBUtilities.getLocalAddress())) + { + initiateStreaming(); + } + else + { + forwardToSource(); + } + } + + private void initiateStreaming() + { + ColumnFamilyStore cfstore = Table.open(tableName).getColumnFamilyStore(cfName); + try + { + logger.info(String.format("[streaming task #%s] Performing streaming repair of %d ranges with %s", id, ranges.size(), dst)); + Collection<SSTableReader> sstables = cfstore.getSSTables(); + // send ranges to the remote node + StreamOutSession outsession = StreamOutSession.create(tableName, dst, callback); + StreamOut.transferSSTables(outsession, sstables, ranges, OperationType.AES); + // request ranges from the remote node + StreamIn.requestRanges(dst, tableName, ranges, callback, OperationType.AES); + } + catch(Exception e) + { + throw new RuntimeException("Streaming repair failed", e); + } + } + + private void forwardToSource() + { + try + { + logger.info(String.format("[streaming task #%s] Forwarding streaming repair of %d ranges to %s (to be streamed with %s)", id, ranges.size(), src, dst)); + StreamingRepairRequest.send(this); + } + catch (IOException e) + { + throw new RuntimeException("Error forwarding streaming task to " + src, e); + } + } + + private static Runnable makeReplyingCallback(final InetAddress taskOwner, final UUID taskId) + { + return new Runnable() + { + // we expect one callback for the receive, and one for the send + private final AtomicInteger outstanding = new AtomicInteger(2); + + public void run() + { + if (outstanding.decrementAndGet() > 0) + // waiting on more calls + return; + + try + { + StreamingRepairResponse.reply(taskOwner, taskId); + } + catch (IOException e) + { + throw new IOError(e); + } + } + }; + } + + // wrap a given callback so as to unregister the streaming repair task on completion + private static Runnable wrapCallback(final Runnable callback, final UUID taskid, final boolean isLocalTask) + { + return new Runnable() + { + // we expect one callback for the receive, and one for the send + private final AtomicInteger outstanding = new AtomicInteger(isLocalTask ? 2 : 1); + + public void run() + { + if (outstanding.decrementAndGet() > 0) + // waiting on more calls + return; + + tasks.remove(taskid); + if (callback != null) + callback.run(); + } + }; + } + + public static class StreamingRepairRequest implements IVerbHandler + { + public void doVerb(Message message, String id) + { + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + + StreamingRepairTask task; + try + { + task = StreamingRepairTask.serializer.deserialize(dis, message.getVersion()); + } + catch (IOException e) + { + throw new IOError(e); + } + + assert task.src.equals(FBUtilities.getLocalAddress()); + assert task.owner.equals(message.getFrom()); + + logger.info(String.format("[streaming task #%s] Received task from %s to stream %d ranges to %s", task.id, message.getFrom(), task.ranges.size(), task.dst)); + + task.run(); + } + + private static void send(StreamingRepairTask task) throws IOException + { + int version = Gossiper.instance.getVersion(task.src); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + StreamingRepairTask.serializer.serialize(task, dos, version); + Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_REQUEST, bos.toByteArray(), version); + MessagingService.instance().sendOneWay(msg, task.src); + } + } + + public static class StreamingRepairResponse implements IVerbHandler + { + public void doVerb(Message message, String id) + { + byte[] bytes = message.getMessageBody(); + DataInputStream dis = new DataInputStream(new ByteArrayInputStream(bytes)); + + UUID taskid; + try + { + taskid = UUIDGen.read(dis); + } + catch (IOException e) + { + throw new IOError(new IOException("Error reading stream repair response from " + message.getFrom(), e)); + } + + StreamingRepairTask task = tasks.get(taskid); + if (task == null) + { + logger.error(String.format("Received a stream repair response from %s for unknow taks %s (have this node been restarted recently?)", message.getFrom(), taskid)); + return; + } + + assert task.owner.equals(FBUtilities.getLocalAddress()); + + logger.info(String.format("[streaming task #%s] task succeeded", task.id)); + if (task.callback != null) + task.callback.run(); + } + + private static void reply(InetAddress remote, UUID taskid) throws IOException + { + logger.info(String.format("[streaming task #%s] task suceed, forwarding response to %s", taskid, remote)); + int version = Gossiper.instance.getVersion(remote); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + UUIDGen.write(taskid, dos); + Message msg = new Message(FBUtilities.getLocalAddress(), StorageService.Verb.STREAMING_REPAIR_RESPONSE, bos.toByteArray(), version); + MessagingService.instance().sendOneWay(msg, remote); + } + } + + private static class StreamingRepairTaskSerializer implements ICompactSerializer<StreamingRepairTask> + { + public void serialize(StreamingRepairTask task, DataOutputStream dos, int version) throws IOException + { + UUIDGen.write(task.id, dos); + CompactEndpointSerializationHelper.serialize(task.owner, dos); + CompactEndpointSerializationHelper.serialize(task.src, dos); + CompactEndpointSerializationHelper.serialize(task.dst, dos); + dos.writeUTF(task.tableName); + dos.writeUTF(task.cfName); + dos.writeInt(task.ranges.size()); + for (Range range : task.ranges) + { + AbstractBounds.serializer().serialize(range, dos); + } + // We don't serialize the callback on purpose + } + + public StreamingRepairTask deserialize(DataInputStream dis, int version) throws IOException + { + UUID id = UUIDGen.read(dis); + InetAddress owner = CompactEndpointSerializationHelper.deserialize(dis); + InetAddress src = CompactEndpointSerializationHelper.deserialize(dis); + InetAddress dst = CompactEndpointSerializationHelper.deserialize(dis); + String tableName = dis.readUTF(dis); + String cfName = dis.readUTF(dis); + int rangesCount = dis.readInt(); + List<Range> ranges = new ArrayList<Range>(rangesCount); + for (int i = 0; i < rangesCount; ++i) + { + ranges.add((Range) AbstractBounds.serializer().deserialize(dis)); + } + return new StreamingRepairTask(id, owner, src, dst, tableName, cfName, ranges, makeReplyingCallback(owner, id)); + } + } +} Modified: cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java URL: http://svn.apache.org/viewvc/cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java (original) +++ cassandra/trunk/src/java/org/apache/cassandra/utils/UUIDGen.java Fri Sep 2 10:24:21 2011 @@ -20,7 +20,9 @@ package org.apache.cassandra.utils; * */ - +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.ByteBuffer; @@ -68,6 +70,19 @@ public class UUIDGen return new UUID(raw.getLong(raw.position()), raw.getLong(raw.position() + 8)); } + /** reads a uuid from an input stream. */ + public static UUID read(DataInputStream dis) throws IOException + { + return new UUID(dis.readLong(), dis.readLong()); + } + + /** writes a uuid to an output stream. */ + public static void write(UUID uuid, DataOutputStream dos) throws IOException + { + dos.writeLong(uuid.getMostSignificantBits()); + dos.writeLong(uuid.getLeastSignificantBits()); + } + /** decomposes a uuid into raw bytes. */ public static byte[] decompose(UUID uuid) { Modified: cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/io/CompactSerializerTest.java Fri Sep 2 10:24:21 2011 @@ -73,6 +73,7 @@ public class CompactSerializerTest exten expectedClassNames.add("InnerSerializer"); expectedClassNames.add("LeafSerializer"); expectedClassNames.add("MerkleTreeSerializer"); + expectedClassNames.add("StreamingRepairTaskSerializer"); discoveredClassNames = new ArrayList<String>(); String cp = System.getProperty("java.class.path"); Modified: cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java URL: http://svn.apache.org/viewvc/cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java?rev=1164463&r1=1164462&r2=1164463&view=diff ============================================================================== --- cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java (original) +++ cassandra/trunk/test/unit/org/apache/cassandra/service/AntiEntropyServiceTestAbstract.java Fri Sep 2 10:24:21 2011 @@ -103,8 +103,10 @@ public abstract class AntiEntropyService local_range = StorageService.instance.getLocalPrimaryRange(); - // random session id for each test - request = new TreeRequest(UUID.randomUUID().toString(), LOCAL, local_range, new CFPair(tablename, cfname)); + // (we use REMOTE instead of LOCAL so that the reponses for the validator.complete() get lost) + request = new TreeRequest(UUID.randomUUID().toString(), REMOTE, local_range, new CFPair(tablename, cfname)); + // Set a fake session corresponding to this fake request + AntiEntropyService.instance.submitArtificialRepairSession(request, tablename, cfname); } @After @@ -159,26 +161,6 @@ public abstract class AntiEntropyService } @Test - public void testManualRepair() throws Throwable - { - RepairFuture sess = AntiEntropyService.instance.submitRepairSession(local_range, tablename, cfname); - - // ensure that the session doesn't end without a response from REMOTE - try - { - sess.get(500, TimeUnit.MILLISECONDS); - fail("Repair session should not end without response from REMOTE"); - } - catch (TimeoutException e) {} - - // deliver a fake response from REMOTE - sess.session.completed(REMOTE, request.cf.right); - - // block until the repair has completed - sess.get(); - } - - @Test public void testGetNeighborsPlusOne() throws Throwable { // generate rf+1 nodes, and ensure that all nodes are returned @@ -244,7 +226,10 @@ public abstract class AntiEntropyService interesting.add(changed); // difference the trees - AntiEntropyService.RepairSession.Differencer diff = sess.session.new Differencer(cfname, request.endpoint, ltree, rtree); + // note: we reuse the same endpoint which is bogus in theory but fine here + AntiEntropyService.TreeResponse r1 = new AntiEntropyService.TreeResponse(REMOTE, ltree); + AntiEntropyService.TreeResponse r2 = new AntiEntropyService.TreeResponse(REMOTE, rtree); + AntiEntropyService.RepairSession.Differencer diff = sess.session.new Differencer(cfname, r1, r2); diff.run(); // ensure that the changed range was recorded