Merge branch 'cassandra-1.2' into trunk
Conflicts:
src/java/org/apache/cassandra/service/ActiveRepairService.java
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/a9d97e38
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/a9d97e38
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/a9d97e38
Branch: refs/heads/trunk
Commit: a9d97e381e1ee33e7e4c0ff42f29890a0bd289da
Parents: 526f98f fbc4bbc
Author: Yuki Morishita <[email protected]>
Authored: Tue Apr 30 14:29:43 2013 -0500
Committer: Yuki Morishita <[email protected]>
Committed: Tue Apr 30 14:29:43 2013 -0500
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/service/ActiveRepairService.java | 9 +++++++++
2 files changed, 10 insertions(+), 0 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9d97e38/CHANGES.txt
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/a9d97e38/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/ActiveRepairService.java
index b692ab0,0000000..1752911
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@@ -1,1135 -1,0 +1,1144 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.service;
+
+import java.io.*;
+import java.net.InetAddress;
+import java.security.MessageDigest;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Condition;
+
+import com.google.common.base.Objects;
+import com.google.common.collect.Sets;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
+import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.concurrent.Stage;
+import org.apache.cassandra.concurrent.StageManager;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.db.compaction.AbstractCompactedRow;
+import org.apache.cassandra.db.compaction.CompactionManager;
+import org.apache.cassandra.dht.AbstractBounds;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.*;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.net.*;
+import org.apache.cassandra.streaming.StreamingRepairTask;
+import org.apache.cassandra.utils.*;
+
+/**
+ * ActiveRepairService encapsulates "validating" (hashing) individual column
families,
+ * exchanging MerkleTrees with remote nodes via a tree request/response
conversation,
+ * and then triggering repairs for disagreeing ranges.
+ *
+ * The node where repair was invoked acts as the 'initiator,' where valid
trees are sent after generation
+ * and where the local and remote tree will rendezvous in rendezvous().
+ * Once the trees rendezvous, a Differencer is executed and the service can
trigger repairs
+ * for disagreeing ranges.
+ *
+ * Tree comparison and repair triggering occur in the single threaded
Stage.ANTI_ENTROPY.
+ *
+ * The steps taken to enact a repair are as follows:
+ * 1. A repair is requested via JMX/nodetool:
+ * * The initiator sends TreeRequest messages to all neighbors of the
target node: when a node
+ * receives a TreeRequest, it will perform a validation (read-only)
compaction to immediately validate
+ * the column family. This is performed on the CompactionManager
ExecutorService.
+ * 2. The validation process builds the merkle tree by:
+ * * Calling Validator.prepare(), which samples the column family to
determine key distribution,
+ * * Calling Validator.add() in order for rows in repair range in the
column family,
+ * * Calling Validator.complete() to indicate that all rows have been added.
+ * * Calling complete() indicates that a valid MerkleTree has been
created for the column family.
+ * * The valid tree is returned to the requesting node via a TreeResponse.
+ * 3. When a node receives a tree response, it passes the tree to
rendezvous() to see if all responses are
+ * received. Once the initiator receives all responses, it creates
Differencers on every tree pair combination.
+ * 4. Differencers are executed in Stage.ANTI_ENTROPY, to compare the given
two trees, and perform repair via the
+ * streaming api.
+ */
+public class ActiveRepairService
+{
+ private static final Logger logger =
LoggerFactory.getLogger(ActiveRepairService.class);
+
+ // singleton enforcement
+ public static final ActiveRepairService instance = new
ActiveRepairService();
+
+ private static final ThreadPoolExecutor executor;
+ static
+ {
+ executor = new JMXConfigurableThreadPoolExecutor(4,
+ 60,
+ TimeUnit.SECONDS,
+ new
LinkedBlockingQueue<Runnable>(),
+ new
NamedThreadFactory("AntiEntropySessions"),
+ "internal");
+ }
+
+ public static enum Status
+ {
+ STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED
+ }
+
+ /**
+ * A map of active session.
+ */
+ private final ConcurrentMap<String, RepairSession> sessions;
+
+ /**
+ * Protected constructor. Use ActiveRepairService.instance.
+ */
+ protected ActiveRepairService()
+ {
+ sessions = new ConcurrentHashMap<String, RepairSession>();
+ }
+
+ /**
+ * Requests repairs for the given keyspace and column families.
+ *
+ * @return Future for asynchronous call or null if there is no need to
repair
+ */
+ public RepairFuture submitRepairSession(Range<Token> range, String
tablename, boolean isSequential, boolean isLocal, String... cfnames)
+ {
+ RepairSession session = new RepairSession(range, tablename,
isSequential, isLocal, cfnames);
+ if (session.endpoints.isEmpty())
+ return null;
+ RepairFuture futureTask = session.getFuture();
+ executor.execute(futureTask);
+ return futureTask;
+ }
+
+ public void terminateSessions()
+ {
+ for (RepairSession session : sessions.values())
+ {
+ session.forceShutdown();
+ }
+ }
+
+ // for testing only. Create a session corresponding to a fake request and
+ // add it to the sessions (avoid NPE in tests)
+ RepairFuture submitArtificialRepairSession(TreeRequest req, String
tablename, String... cfnames)
+ {
+ RepairFuture futureTask = new RepairSession(req, tablename,
cfnames).getFuture();
+ executor.execute(futureTask);
+ return futureTask;
+ }
+
+ /**
+ * Return all of the neighbors with whom we share the provided range.
+ *
+ * @param table table to repair
+ * @param toRepair token to repair
+ * @param isLocal need to use only nodes from local datacenter
+ *
+ * @return neighbors with whom we share the provided range
+ */
+ static Set<InetAddress> getNeighbors(String table, Range<Token> toRepair,
boolean isLocal)
+ {
+ StorageService ss = StorageService.instance;
+ Map<Range<Token>, List<InetAddress>> replicaSets =
ss.getRangeToAddressMap(table);
+ Range<Token> rangeSuperSet = null;
+ for (Range<Token> range : ss.getLocalRanges(table))
+ {
+ if (range.contains(toRepair))
+ {
+ rangeSuperSet = range;
+ break;
+ }
+ else if (range.intersects(toRepair))
+ {
+ throw new IllegalArgumentException("Requested range
intersects a local range but is not fully contained in one; this would lead to
imprecise repair");
+ }
+ }
+ if (rangeSuperSet == null || !replicaSets.containsKey(rangeSuperSet))
+ return Collections.emptySet();
+
+ Set<InetAddress> neighbors = new
HashSet<InetAddress>(replicaSets.get(rangeSuperSet));
+ neighbors.remove(FBUtilities.getBroadcastAddress());
+
+ if (isLocal)
+ {
+ TokenMetadata.Topology topology =
ss.getTokenMetadata().cloneOnlyTokenMap().getTopology();
+ Set<InetAddress> localEndpoints =
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
+ return Sets.intersection(neighbors, localEndpoints);
+ }
+
+ return neighbors;
+ }
+
+ /**
+ * Register a tree for the given request to be compared to the
appropriate trees in Stage.ANTIENTROPY when they become available.
+ */
+ private void rendezvous(TreeRequest request, MerkleTree tree)
+ {
+ RepairSession session = sessions.get(request.sessionid);
+ if (session == null)
+ {
+ logger.warn("Got a merkle tree response for unknown repair
session {}: either this node has been restarted since the session was started,
or the session has been interrupted for an unknown reason. ",
request.sessionid);
+ return;
+ }
+
+ RepairSession.RepairJob job = session.jobs.peek();
+ if (job == null)
+ {
+ assert session.terminated();
+ return;
+ }
+
+ logger.info(String.format("[repair #%s] Received merkle tree for %s
from %s", session.getName(), request.cf.right, request.endpoint));
+
+ if (job.addTree(request, tree) == 0)
+ {
+ logger.debug("All trees received for " + session.getName() + "/"
+ request.cf.right);
+ job.submitDifferencers();
+
+ // This job is complete, switching to next in line (note that only
+ // one thread will can ever do this)
+ session.jobs.poll();
+ RepairSession.RepairJob nextJob = session.jobs.peek();
+ if (nextJob == null)
+ // We are done with this repair session as far as differencing
+ // is considern. Just inform the session
+ session.differencingDone.signalAll();
+ else
+ nextJob.sendTreeRequests();
+ }
+ }
+
+ /**
+ * Responds to the node that requested the given valid tree.
+ * @param validator A locally generated validator
+ * @param local localhost (parameterized for testing)
+ */
+ void respond(Validator validator, InetAddress local)
+ {
+ MessagingService ms = MessagingService.instance();
+
+ try
+ {
+ if
(!validator.request.endpoint.equals(FBUtilities.getBroadcastAddress()))
+ logger.info(String.format("[repair #%s] Sending completed
merkle tree to %s for %s", validator.request.sessionid,
validator.request.endpoint, validator.request.cf));
+ ms.sendOneWay(validator.createMessage(),
validator.request.endpoint);
+ }
+ catch (Exception e)
+ {
+ logger.error(String.format("[repair #%s] Error sending completed
merkle tree to %s for %s ", validator.request.sessionid,
validator.request.endpoint, validator.request.cf), e);
+ }
+ }
+
+ /**
+ * A Strategy to handle building a merkle tree for a column family.
+ *
+ * Lifecycle:
+ * 1. prepare() - Initialize tree with samples.
+ * 2. add() - 0 or more times, to add hashes to the tree.
+ * 3. complete() - complete building tree and send it back to the
initiator
+ */
+ public static class Validator implements Runnable
+ {
+ public final TreeRequest request;
+ public final MerkleTree tree;
+
+ // null when all rows with the min token have been consumed
+ private transient long validated;
+ private transient MerkleTree.TreeRange range;
+ private transient MerkleTree.TreeRangeIterator ranges;
+ private transient DecoratedKey lastKey;
+
+ public final static MerkleTree.RowHash EMPTY_ROW = new
MerkleTree.RowHash(null, new byte[0]);
+ public static ValidatorSerializer serializer = new
ValidatorSerializer();
+
+ public Validator(TreeRequest request)
+ {
+ this(request,
+ // TODO: memory usage (maxsize) should either be tunable per
+ // CF, globally, or as shared for all CFs in a cluster
+ new MerkleTree(DatabaseDescriptor.getPartitioner(),
request.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)));
+ }
+
+ Validator(TreeRequest request, MerkleTree tree)
+ {
+ this.request = request;
+ this.tree = tree;
+ // Reestablishing the range because we don't serialize it (for bad
+ // reason - see MerkleTree for details)
+ this.tree.fullRange = this.request.range;
+ validated = 0;
+ range = null;
+ ranges = null;
+ }
+
+ public void prepare(ColumnFamilyStore cfs)
+ {
+ if (!tree.partitioner().preservesOrder())
+ {
+ // You can't beat an even tree distribution for md5
+ tree.init();
+ }
+ else
+ {
+ List<DecoratedKey> keys = new ArrayList<DecoratedKey>();
+ for (DecoratedKey sample : cfs.keySamples(request.range))
+ {
+ assert request.range.contains(sample.token): "Token " +
sample.token + " is not within range " + request.range;
+ keys.add(sample);
+ }
+
+ if (keys.isEmpty())
+ {
+ // use an even tree distribution
+ tree.init();
+ }
+ else
+ {
+ int numkeys = keys.size();
+ Random random = new Random();
+ // sample the column family using random keys from the
index
+ while (true)
+ {
+ DecoratedKey dk = keys.get(random.nextInt(numkeys));
+ if (!tree.split(dk.token))
+ break;
+ }
+ }
+ }
+ logger.debug("Prepared AEService tree of size " + tree.size() + "
for " + request);
+ ranges = tree.invalids();
+ }
+
+ /**
+ * Called (in order) for rows in given range present in the CF.
+ * Hashes the row, and adds it to the tree being built.
+ *
+ * @param row The row.
+ */
+ public void add(AbstractCompactedRow row)
+ {
+ assert request.range.contains(row.key.token) : row.key.token + "
is not contained in " + request.range;
+ assert lastKey == null || lastKey.compareTo(row.key) < 0
+ : "row " + row.key + " received out of order wrt " +
lastKey;
+ lastKey = row.key;
+
+ if (range == null)
+ range = ranges.next();
+
+ // generate new ranges as long as case 1 is true
+ while (!range.contains(row.key.token))
+ {
+ // add the empty hash, and move to the next range
+ range.addHash(EMPTY_ROW);
+ range = ranges.next();
+ }
+
+ // case 3 must be true: mix in the hashed row
+ range.addHash(rowHash(row));
+ }
+
+ private MerkleTree.RowHash rowHash(AbstractCompactedRow row)
+ {
+ validated++;
+ // MerkleTree uses XOR internally, so we want lots of output bits
here
+ MessageDigest digest = FBUtilities.newMessageDigest("SHA-256");
+ row.update(digest);
+ return new MerkleTree.RowHash(row.key.token, digest.digest());
+ }
+
+ /**
+ * Registers the newly created tree for rendezvous in
Stage.ANTI_ENTROPY.
+ */
+ public void complete()
+ {
+ completeTree();
+
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(this);
+ logger.debug("Validated " + validated + " rows into AEService
tree for " + request);
+ }
+
+ void completeTree()
+ {
+ assert ranges != null : "Validator was not prepared()";
+
+ if (range != null)
+ range.addHash(EMPTY_ROW);
+ while (ranges.hasNext())
+ {
+ range = ranges.next();
+ range.addHash(EMPTY_ROW);
+ }
+ }
+
+ /**
+ * Called after the validation lifecycle to respond with the now
valid tree. Runs in Stage.ANTI_ENTROPY.
+ */
+ public void run()
+ {
+ // respond to the request that triggered this validation
+ ActiveRepairService.instance.respond(this,
FBUtilities.getBroadcastAddress());
+ }
+
+ public MessageOut<Validator> createMessage()
+ {
+ return new
MessageOut<Validator>(MessagingService.Verb.TREE_RESPONSE, this,
Validator.serializer);
+ }
+
+ public static class ValidatorSerializer implements
IVersionedSerializer<Validator>
+ {
+ public void serialize(Validator validator, DataOutput out, int
version) throws IOException
+ {
+ TreeRequest.serializer.serialize(validator.request, out,
version);
+ MerkleTree.serializer.serialize(validator.tree, out, version);
+ }
+
+ public Validator deserialize(DataInput in, int version) throws
IOException
+ {
+ final TreeRequest request =
TreeRequest.serializer.deserialize(in, version);
+ try
+ {
+ return new Validator(request,
MerkleTree.serializer.deserialize(in, version));
+ }
+ catch(Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public long serializedSize(Validator validator, int version)
+ {
+ return
TreeRequest.serializer.serializedSize(validator.request, version)
+ + MerkleTree.serializer.serializedSize(validator.tree,
version);
+ }
+ }
+ }
+
+ /**
+ * Handler for requests from remote nodes to generate a valid tree.
+ */
+ public static class TreeRequestVerbHandler implements
IVerbHandler<TreeRequest>
+ {
+ /**
+ * Trigger a validation compaction which will return the tree upon
completion.
+ */
+ public void doVerb(MessageIn<TreeRequest> message, int id)
+ {
+ TreeRequest remotereq = message.payload;
+ TreeRequest request = new TreeRequest(remotereq.sessionid,
message.from, remotereq.range, remotereq.gcBefore, remotereq.cf);
+
+ // trigger read-only compaction
+ ColumnFamilyStore store =
Table.open(request.cf.left).getColumnFamilyStore(request.cf.right);
+ Validator validator = new Validator(request);
+ logger.debug("Queueing validation compaction for " + request);
+ CompactionManager.instance.submitValidation(store, validator);
+ }
+ }
+
+ /**
+ * Handler for responses from remote nodes which contain a valid tree.
+ * The payload is a completed Validator object from the remote endpoint.
+ */
+ public static class TreeResponseVerbHandler implements
IVerbHandler<Validator>
+ {
+ public void doVerb(MessageIn<Validator> message, int id)
+ {
+ // deserialize the remote tree, and register it
+ Validator response = message.payload;
+ TreeRequest request = new TreeRequest(response.request.sessionid,
message.from, response.request.range, response.request.gcBefore,
response.request.cf);
+ ActiveRepairService.instance.rendezvous(request, response.tree);
+ }
+ }
+
+ /**
+ * A tuple of table and cf.
+ */
+ public static class CFPair extends Pair<String,String>
+ {
+ public CFPair(String table, String cf)
+ {
+ super(table, cf);
+ assert table != null && cf != null;
+ }
+ }
+
+ /**
+ * A tuple of table, cf, address and range that represents a location we
have an outstanding TreeRequest for.
+ */
+ public static class TreeRequest
+ {
+ public static final TreeRequestSerializer serializer = new
TreeRequestSerializer();
+
+ public final String sessionid;
+ public final InetAddress endpoint;
+ public final Range<Token> range;
+ public final int gcBefore;
+ public final CFPair cf;
+
+ public TreeRequest(String sessionid, InetAddress endpoint,
Range<Token> range, int gcBefore, CFPair cf)
+ {
+ this.sessionid = sessionid;
+ this.endpoint = endpoint;
+ this.cf = cf;
+ this.gcBefore = gcBefore;
+ this.range = range;
+ }
+
+ @Override
+ public final int hashCode()
+ {
+ return Objects.hashCode(sessionid, endpoint, gcBefore, cf, range);
+ }
+
+ @Override
+ public final boolean equals(Object o)
+ {
+ if(!(o instanceof TreeRequest))
+ return false;
+ TreeRequest that = (TreeRequest)o;
+ // handles nulls properly
+ return Objects.equal(sessionid, that.sessionid) &&
Objects.equal(endpoint, that.endpoint) && gcBefore == that.gcBefore &&
Objects.equal(cf, that.cf) && Objects.equal(range, that.range);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "#<TreeRequest " + sessionid + ", " + endpoint + ", " +
gcBefore + ", " + cf + ", " + range + ">";
+ }
+
+ public MessageOut<TreeRequest> createMessage()
+ {
+ return new
MessageOut<TreeRequest>(MessagingService.Verb.TREE_REQUEST, this,
TreeRequest.serializer);
+ }
+
+ public static class TreeRequestSerializer implements
IVersionedSerializer<TreeRequest>
+ {
+ public void serialize(TreeRequest request, DataOutput out, int
version) throws IOException
+ {
+ out.writeUTF(request.sessionid);
+
CompactEndpointSerializationHelper.serialize(request.endpoint, out);
+
+ if (version >= MessagingService.VERSION_20)
+ out.writeInt(request.gcBefore);
+ out.writeUTF(request.cf.left);
+ out.writeUTF(request.cf.right);
+ AbstractBounds.serializer.serialize(request.range, out,
version);
+ }
+
+ public TreeRequest deserialize(DataInput in, int version) throws
IOException
+ {
+ String sessId = in.readUTF();
+ InetAddress endpoint =
CompactEndpointSerializationHelper.deserialize(in);
+ int gcBefore = -1;
+ if (version >= MessagingService.VERSION_20)
+ gcBefore = in.readInt();
+ CFPair cfpair = new CFPair(in.readUTF(), in.readUTF());
+ Range<Token> range;
+ range = (Range<Token>)
AbstractBounds.serializer.deserialize(in, version);
+
+ return new TreeRequest(sessId, endpoint, range, gcBefore,
cfpair);
+ }
+
+ public long serializedSize(TreeRequest request, int version)
+ {
+ return TypeSizes.NATIVE.sizeof(request.sessionid)
+ +
CompactEndpointSerializationHelper.serializedSize(request.endpoint)
+ + TypeSizes.NATIVE.sizeof(request.gcBefore)
+ + TypeSizes.NATIVE.sizeof(request.cf.left)
+ + TypeSizes.NATIVE.sizeof(request.cf.right)
+ +
AbstractBounds.serializer.serializedSize(request.range, version);
+ }
+ }
+ }
+
+ /**
+ * Triggers repairs with all neighbors for the given table, cfs and range.
+ */
+ static class RepairSession extends WrappedRunnable implements
IEndpointStateChangeSubscriber, IFailureDetectionEventListener
+ {
+ private final String sessionName;
+ private final boolean isSequential;
+ private final String tablename;
+ private final String[] cfnames;
+ private final Range<Token> range;
+ private volatile Exception exception;
+ private final AtomicBoolean isFailed = new AtomicBoolean(false);
+
+ private final Set<InetAddress> endpoints;
+ final Queue<RepairJob> jobs = new ConcurrentLinkedQueue<RepairJob>();
+ final Map<String, RepairJob> activeJobs = new
ConcurrentHashMap<String, RepairJob>();
+
+ private final SimpleCondition completed = new SimpleCondition();
+ public final Condition differencingDone = new SimpleCondition();
+
+ private volatile boolean terminated = false;
+
+ public RepairSession(TreeRequest req, String tablename, String...
cfnames)
+ {
+ this(req.sessionid, req.range, tablename, false, false, cfnames);
+ ActiveRepairService.instance.sessions.put(getName(), this);
+ }
+
+ public RepairSession(Range<Token> range, String tablename, boolean
isSequential, boolean isLocal, String... cfnames)
+ {
+ this(UUIDGen.getTimeUUID().toString(), range, tablename,
isSequential, isLocal, cfnames);
+ }
+
+ private RepairSession(String id, Range<Token> range, String
tablename, boolean isSequential, boolean isLocal, String[] cfnames)
+ {
+ this.sessionName = id;
+ this.isSequential = isSequential;
+ this.tablename = tablename;
+ this.cfnames = cfnames;
+ assert cfnames.length > 0 : "Repairing no column families seems
pointless, doesn't it";
+ this.range = range;
+ this.endpoints = ActiveRepairService.getNeighbors(tablename,
range, isLocal);
+ }
+
+ public String getName()
+ {
+ return sessionName;
+ }
+
+ public Range<Token> getRange()
+ {
+ return range;
+ }
+
+ RepairFuture getFuture()
+ {
+ return new RepairFuture(this);
+ }
+
+ private String repairedNodes()
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(FBUtilities.getBroadcastAddress());
+ for (InetAddress ep : endpoints)
+ sb.append(", ").append(ep);
+ return sb.toString();
+ }
+
+ // we don't care about the return value but care about it throwing
exception
+ public void runMayThrow() throws Exception
+ {
+ logger.info(String.format("[repair #%s] new session: will sync %s
on range %s for %s.%s", getName(), repairedNodes(), range, tablename,
Arrays.toString(cfnames)));
+
+ if (endpoints.isEmpty())
+ {
+ differencingDone.signalAll();
+ logger.info(String.format("[repair #%s] No neighbors to
repair with on range %s: session completed", getName(), range));
+ return;
+ }
+
+ // Checking all nodes are live
+ for (InetAddress endpoint : endpoints)
+ {
+ if (!FailureDetector.instance.isAlive(endpoint))
+ {
+ String message = String.format("Cannot proceed on repair
because a neighbor (%s) is dead: session failed", endpoint);
+ differencingDone.signalAll();
+ logger.error(String.format("[repair #%s] ", getName()) +
message);
+ throw new IOException(message);
+ }
++
++ // All endpoints should be on the same protocol version
++ if (!MessagingService.instance().knowsVersion(endpoint) ||
MessagingService.instance().getVersion(endpoint) !=
MessagingService.current_version)
++ {
++ String message = "Cannot repair among different protocol
versions";
++ differencingDone.signalAll();
++ logger.error(String.format("[repair #%s] ", getName()) +
message);
++ throw new IOException(message);
++ }
+ }
+
+ ActiveRepairService.instance.sessions.put(getName(), this);
+ Gossiper.instance.register(this);
+
FailureDetector.instance.registerFailureDetectionEventListener(this);
+ try
+ {
+ // Create and queue a RepairJob for each column family
+ for (String cfname : cfnames)
+ {
+ RepairJob job = new RepairJob(cfname);
+ jobs.offer(job);
+ activeJobs.put(cfname, job);
+ }
+
+ jobs.peek().sendTreeRequests();
+
+ // block whatever thread started this session until all
requests have been returned:
+ // if this thread dies, the session will still complete in
the background
+ completed.await();
+ if (exception == null)
+ {
+ logger.info(String.format("[repair #%s] session completed
successfully", getName()));
+ }
+ else
+ {
+ logger.error(String.format("[repair #%s] session
completed with the following error", getName()), exception);
+ throw exception;
+ }
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException("Interrupted while waiting for
repair.");
+ }
+ finally
+ {
+ // mark this session as terminated
+ terminate();
+
FailureDetector.instance.unregisterFailureDetectionEventListener(this);
+ Gossiper.instance.unregister(this);
+ ActiveRepairService.instance.sessions.remove(getName());
+ }
+ }
+
+ /**
+ * @return whether this session is terminated
+ */
+ public boolean terminated()
+ {
+ return terminated;
+ }
+
+ public void terminate()
+ {
+ terminated = true;
+ for (RepairJob job : jobs)
+ job.terminate();
+ jobs.clear();
+ activeJobs.clear();
+ }
+
+ /**
+ * terminate this session.
+ */
+ public void forceShutdown()
+ {
+ differencingDone.signalAll();
+ completed.signalAll();
+ }
+
+ void completed(Differencer differencer)
+ {
+ logger.debug(String.format("[repair #%s] Repair completed between
%s and %s on %s",
+ getName(),
+ differencer.r1.endpoint,
+ differencer.r2.endpoint,
+ differencer.cfname));
+ RepairJob job = activeJobs.get(differencer.cfname);
+ if (job == null)
+ {
+ assert terminated;
+ return;
+ }
+
+ if (job.completedSynchronization(differencer))
+ {
+ activeJobs.remove(differencer.cfname);
+ String remaining = activeJobs.size() == 0 ? "" :
String.format(" (%d remaining column family to sync for this session)",
activeJobs.size());
+ logger.info(String.format("[repair #%s] %s is fully
synced%s", getName(), differencer.cfname, remaining));
+ if (activeJobs.isEmpty())
+ completed.signalAll();
+ }
+ }
+
+ void failedNode(InetAddress remote)
+ {
+ String errorMsg = String.format("Endpoint %s died", remote);
+ exception = new IOException(errorMsg);
+ // If a node failed, we stop everything (though there could still
be some activity in the background)
+ forceShutdown();
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void onChange(InetAddress endpoint, ApplicationState state,
VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!endpoints.contains(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than
usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ // Though unlikely, it is possible to arrive here multiple time
and we
+ // want to avoid print an error message twice
+ if (!isFailed.compareAndSet(false, true))
+ return;
+
+ failedNode(endpoint);
+ }
+
+ class RepairJob
+ {
+ private final String cfname;
+ // first we send tree requests. this tracks the endpoints
remaining to hear from
+ private final RequestCoordinator<TreeRequest> treeRequests;
+ // tree responses are then tracked here
+ private final List<TreeResponse> trees = new
ArrayList<TreeResponse>(endpoints.size() + 1);
+ // once all responses are received, each tree is compared with
each other, and differencer tasks
+ // are submitted. the job is done when all differencers are
complete.
+ private final RequestCoordinator<Differencer> differencers;
+ private final Condition requestsSent = new SimpleCondition();
+ private CountDownLatch snapshotLatch = null;
+
+ public RepairJob(String cfname)
+ {
+ this.cfname = cfname;
+ this.treeRequests = new
RequestCoordinator<TreeRequest>(isSequential)
+ {
+ public void send(TreeRequest r)
+ {
+
MessagingService.instance().sendOneWay(r.createMessage(), r.endpoint);
+ }
+ };
+ this.differencers = new
RequestCoordinator<Differencer>(isSequential)
+ {
+ public void send(Differencer d)
+ {
+ StageManager.getStage(Stage.ANTI_ENTROPY).execute(d);
+ }
+ };
+ }
+
+ /**
+ * Send merkle tree request to every involved neighbor.
+ */
+ public void sendTreeRequests()
+ {
+ // send requests to all nodes
+ List<InetAddress> allEndpoints = new
ArrayList<InetAddress>(endpoints);
+ allEndpoints.add(FBUtilities.getBroadcastAddress());
+
+ if (isSequential)
+ makeSnapshots(endpoints);
+
+ int gcBefore = (int)(System.currentTimeMillis()/1000) -
Table.open(tablename).getColumnFamilyStore(cfname).metadata.getGcGraceSeconds();
+
+ for (InetAddress endpoint : allEndpoints)
+ treeRequests.add(new TreeRequest(getName(), endpoint,
range, gcBefore, new CFPair(tablename, cfname)));
+
+ logger.info(String.format("[repair #%s] requesting merkle
trees for %s (to %s)", getName(), cfname, allEndpoints));
+ treeRequests.start();
+ requestsSent.signalAll();
+ }
+
+ public void makeSnapshots(Collection<InetAddress> endpoints)
+ {
+ try
+ {
+ snapshotLatch = new CountDownLatch(endpoints.size());
+ IAsyncCallback callback = new IAsyncCallback()
+ {
+ public boolean isLatencyForSnitch()
+ {
+ return false;
+ }
+
+ public void response(MessageIn msg)
+ {
+ RepairJob.this.snapshotLatch.countDown();
+ }
+ };
+ for (InetAddress endpoint : endpoints)
+ MessagingService.instance().sendRR(new
SnapshotCommand(tablename, cfname, sessionName, false).createMessage(),
endpoint, callback);
+ snapshotLatch.await();
+ snapshotLatch = null;
+ }
+ catch (InterruptedException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Add a new received tree and return the number of remaining
tree to
+ * be received for the job to be complete.
+ *
+ * Callers may assume exactly one addTree call will result in
zero remaining endpoints.
+ */
+ public synchronized int addTree(TreeRequest request, MerkleTree
tree)
+ {
+ // Wait for all request to have been performed (see #3400)
+ try
+ {
+ requestsSent.await();
+ }
+ catch (InterruptedException e)
+ {
+ throw new AssertionError("Interrupted while waiting for
requests to be sent");
+ }
+
+ assert request.cf.right.equals(cfname);
+ trees.add(new TreeResponse(request.endpoint, tree));
+ return treeRequests.completed(request);
+ }
+
+ /**
+ * Submit differencers for running.
+ * All tree *must* have been received before this is called.
+ */
+ public void submitDifferencers()
+ {
+ // We need to difference all trees one against another
+ for (int i = 0; i < trees.size() - 1; ++i)
+ {
+ TreeResponse r1 = trees.get(i);
+ for (int j = i + 1; j < trees.size(); ++j)
+ {
+ TreeResponse r2 = trees.get(j);
+ Differencer differencer = new Differencer(cfname, r1,
r2);
+ logger.debug("Queueing comparison {}", differencer);
+ differencers.add(differencer);
+ }
+ }
+ differencers.start();
+ trees.clear(); // allows gc to do its thing
+ }
+
+ /**
+ * @return true if the differencer was the last remaining
+ */
+ synchronized boolean completedSynchronization(Differencer
differencer)
+ {
+ return differencers.completed(differencer) == 0;
+ }
+
+ public void terminate()
+ {
+ if (snapshotLatch != null)
+ {
+ while (snapshotLatch.getCount() > 0)
+ snapshotLatch.countDown();
+ }
+ }
+ }
+
+ /**
+ * Runs on the node that initiated a request to compare two trees,
and launch repairs for disagreeing ranges.
+ */
+ class Differencer implements Runnable
+ {
+ public final String cfname;
+ public final TreeResponse r1;
+ public final TreeResponse r2;
+ public final List<Range<Token>> differences = new
ArrayList<Range<Token>>();
+
+ Differencer(String cfname, TreeResponse r1, TreeResponse r2)
+ {
+ this.cfname = cfname;
+ this.r1 = r1;
+ this.r2 = r2;
+ }
+
+ /**
+ * Compares our trees, and triggers repairs for any ranges that
mismatch.
+ */
+ public void run()
+ {
+ // restore partitioners (in case we were serialized)
+ if (r1.tree.partitioner() == null)
+ r1.tree.partitioner(StorageService.getPartitioner());
+ if (r2.tree.partitioner() == null)
+ r2.tree.partitioner(StorageService.getPartitioner());
+
+ // compare trees, and collect differences
+ differences.addAll(MerkleTree.difference(r1.tree, r2.tree));
+
+ // choose a repair method based on the significance of the
difference
+ String format = String.format("[repair #%s] Endpoints %s and
%s %%s for %s", getName(), r1.endpoint, r2.endpoint, cfname);
+ if (differences.isEmpty())
+ {
+ logger.info(String.format(format, "are consistent"));
+ completed(this);
+ return;
+ }
+
+ // non-0 difference: perform streaming repair
+ logger.info(String.format(format, "have " +
differences.size() + " range(s) out of sync"));
+ performStreamingRepair();
+ }
+
+ /**
+ * Starts sending/receiving our list of differences to/from the
remote endpoint: creates a callback
+ * that will be called out of band once the streams complete.
+ */
+ void performStreamingRepair()
+ {
+ Runnable callback = new Runnable()
+ {
+ public void run()
+ {
+ completed(Differencer.this);
+ }
+ };
+ StreamingRepairTask task =
StreamingRepairTask.create(r1.endpoint, r2.endpoint, tablename, cfname,
differences, callback);
+
+ task.run();
+ }
+
+ public String toString()
+ {
+ return "#<Differencer " + r1.endpoint + "<->" + r2.endpoint +
"/" + range + ">";
+ }
+ }
+ }
+
+ static class TreeResponse
+ {
+ public final InetAddress endpoint;
+ public final MerkleTree tree;
+
+ TreeResponse(InetAddress endpoint, MerkleTree tree)
+ {
+ this.endpoint = endpoint;
+ this.tree = tree;
+ }
+ }
+
+ public static class RepairFuture extends FutureTask
+ {
+ public final RepairSession session;
+
+ RepairFuture(RepairSession session)
+ {
+ super(session, null);
+ this.session = session;
+ }
+ }
+
+ public static abstract class RequestCoordinator<R>
+ {
+ private final Order<R> orderer;
+
+ protected RequestCoordinator(boolean isSequential)
+ {
+ this.orderer = isSequential ? new SequentialOrder(this) : new
ParallelOrder(this);
+ }
+
+ public abstract void send(R request);
+
+ public void add(R request)
+ {
+ orderer.add(request);
+ }
+
+ public void start()
+ {
+ orderer.start();
+ }
+
+ // Returns how many request remains
+ public int completed(R request)
+ {
+ return orderer.completed(request);
+ }
+
+ private static abstract class Order<R>
+ {
+ protected final RequestCoordinator<R> coordinator;
+
+ Order(RequestCoordinator<R> coordinator)
+ {
+ this.coordinator = coordinator;
+ }
+
+ public abstract void add(R request);
+ public abstract void start();
+ public abstract int completed(R request);
+ }
+
+ private static class SequentialOrder<R> extends Order<R>
+ {
+ private final Queue<R> requests = new LinkedList<R>();
+
+ SequentialOrder(RequestCoordinator<R> coordinator)
+ {
+ super(coordinator);
+ }
+
+ public void add(R request)
+ {
+ requests.add(request);
+ }
+
+ public void start()
+ {
+ if (requests.isEmpty())
+ return;
+
+ coordinator.send(requests.peek());
+ }
+
+ public int completed(R request)
+ {
+ assert request.equals(requests.peek());
+ requests.poll();
+ int remaining = requests.size();
+ if (remaining != 0)
+ coordinator.send(requests.peek());
+ return remaining;
+ }
+ }
+
+ private static class ParallelOrder<R> extends Order<R>
+ {
+ private final Set<R> requests = new HashSet<R>();
+
+ ParallelOrder(RequestCoordinator<R> coordinator)
+ {
+ super(coordinator);
+ }
+
+ public void add(R request)
+ {
+ requests.add(request);
+ }
+
+ public void start()
+ {
+ for (R request : requests)
+ coordinator.send(request);
+ }
+
+ public int completed(R request)
+ {
+ requests.remove(request);
+ return requests.size();
+ }
+ }
+
+ }
+}