http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/ActiveRepairService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java index f1b9cce..670aa0b 100644 --- a/src/java/org/apache/cassandra/service/ActiveRepairService.java +++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java @@ -18,6 +18,7 @@ package org.apache.cassandra.service; import java.io.File; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.*; @@ -26,12 +27,11 @@ import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; - +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor; -import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.dht.Bounds; @@ -39,6 +39,7 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.io.sstable.Component; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.locator.TokenMetadata; @@ -46,12 +47,9 @@ import org.apache.cassandra.net.IAsyncCallbackWithFailure; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.repair.*; -import org.apache.cassandra.repair.messages.AnticompactionRequest; -import org.apache.cassandra.repair.messages.PrepareMessage; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.repair.messages.ValidationComplete; +import org.apache.cassandra.repair.RepairJobDesc; +import org.apache.cassandra.repair.RepairSession; +import org.apache.cassandra.repair.messages.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.UUIDGen; @@ -73,21 +71,10 @@ public class ActiveRepairService { private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class); // singleton enforcement - public static final ActiveRepairService instance = new ActiveRepairService(); + public static final ActiveRepairService instance = new ActiveRepairService(FailureDetector.instance, Gossiper.instance); public static final long UNREPAIRED_SSTABLE = 0; - private static final ThreadPoolExecutor executor; - static - { - executor = new JMXConfigurableThreadPoolExecutor(4, - 60, - TimeUnit.SECONDS, - new LinkedBlockingQueue<Runnable>(), - new NamedThreadFactory("AntiEntropySessions"), - "internal"); - } - public static enum Status { STARTED, SESSION_SUCCESS, SESSION_FAILED, FINISHED @@ -96,17 +83,17 @@ public class ActiveRepairService /** * A map of active coordinator session. */ - private final ConcurrentMap<UUID, RepairSession> sessions; + private final ConcurrentMap<UUID, RepairSession> sessions = new ConcurrentHashMap<>(); - private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions; + private final ConcurrentMap<UUID, ParentRepairSession> parentRepairSessions = new ConcurrentHashMap<>(); - /** - * Protected constructor. Use ActiveRepairService.instance. - */ - protected ActiveRepairService() + private final IFailureDetector failureDetector; + private final Gossiper gossiper; + + public ActiveRepairService(IFailureDetector failureDetector, Gossiper gossiper) { - sessions = new ConcurrentHashMap<>(); - parentRepairSessions = new ConcurrentHashMap<>(); + this.failureDetector = failureDetector; + this.gossiper = gossiper; } /** @@ -114,51 +101,52 @@ public class ActiveRepairService * * @return Future for asynchronous call or null if there is no need to repair */ - public RepairFuture submitRepairSession(UUID parentRepairSession, Range<Token> range, String keyspace, boolean isSequential, Set<InetAddress> endpoints, String... cfnames) + public RepairSession submitRepairSession(UUID parentRepairSession, + Range<Token> range, + String keyspace, + boolean isSequential, + Set<InetAddress> endpoints, + long repairedAt, + ListeningExecutorService executor, + String... cfnames) { - RepairSession session = new RepairSession(parentRepairSession, range, keyspace, isSequential, endpoints, cfnames); - if (session.endpoints.isEmpty()) + if (endpoints.isEmpty()) return null; - RepairFuture futureTask = new RepairFuture(session); - executor.execute(futureTask); - return futureTask; - } - public void addToActiveSessions(RepairSession session) - { + final RepairSession session = new RepairSession(parentRepairSession, UUIDGen.getTimeUUID(), range, keyspace, isSequential, endpoints, repairedAt, cfnames); + sessions.put(session.getId(), session); - Gossiper.instance.register(session); - FailureDetector.instance.registerFailureDetectionEventListener(session); - } + // register listeners + gossiper.register(session); + failureDetector.registerFailureDetectionEventListener(session); - public void removeFromActiveSessions(RepairSession session) - { - Gossiper.instance.unregister(session); - sessions.remove(session.getId()); + // unregister listeners at completion + session.addListener(new Runnable() + { + /** + * When repair finished, do clean up + */ + public void run() + { + failureDetector.unregisterFailureDetectionEventListener(session); + gossiper.unregister(session); + sessions.remove(session.getId()); + } + }, MoreExecutors.sameThreadExecutor()); + session.start(executor); + return session; } public void terminateSessions() { + Throwable cause = new IOException("Terminate session is called"); for (RepairSession session : sessions.values()) { - session.forceShutdown(); + session.forceShutdown(cause); } parentRepairSessions.clear(); } - // for testing only. Create a session corresponding to a fake request and - // add it to the sessions (avoid NPE in tests) - RepairFuture submitArtificialRepairSession(RepairJobDesc desc) - { - Set<InetAddress> neighbours = new HashSet<>(); - neighbours.addAll(ActiveRepairService.getNeighbors(desc.keyspace, desc.range, null, null)); - RepairSession session = new RepairSession(desc.parentSessionId, desc.sessionId, desc.range, desc.keyspace, false, neighbours, new String[]{desc.columnFamily}); - sessions.put(session.getId(), session); - RepairFuture futureTask = new RepairFuture(session); - executor.execute(futureTask); - return futureTask; - } - /** * Return all of the neighbors with whom we share the provided range. * @@ -191,7 +179,7 @@ public class ActiveRepairService Set<InetAddress> neighbors = new HashSet<>(replicaSets.get(rangeSuperSet)); neighbors.remove(FBUtilities.getBroadcastAddress()); - if (dataCenters != null) + if (dataCenters != null && !dataCenters.isEmpty()) { TokenMetadata.Topology topology = ss.getTokenMetadata().cloneOnlyTokenMap().getTopology(); Set<InetAddress> dcEndpoints = Sets.newHashSet(); @@ -204,7 +192,7 @@ public class ActiveRepairService } return Sets.intersection(neighbors, dcEndpoints); } - else if (hosts != null) + else if (hosts != null && !hosts.isEmpty()) { Set<InetAddress> specifiedHost = new HashSet<>(); for (final String host : hosts) @@ -314,21 +302,18 @@ public class ActiveRepairService parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis())); } - public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) + public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors) { try { - if (doAntiCompaction) + for (InetAddress neighbor : neighbors) { - for (InetAddress neighbor : neighbors) - { - AnticompactionRequest acr = new AnticompactionRequest(parentSession); - MessageOut<RepairMessage> req = acr.createMessage(); - MessagingService.instance().sendOneWay(req, neighbor); - } - List<Future<?>> futures = doAntiCompaction(parentSession); - FBUtilities.waitOnFutures(futures); + AnticompactionRequest acr = new AnticompactionRequest(parentSession); + MessageOut<RepairMessage> req = acr.createMessage(); + MessagingService.instance().sendOneWay(req, neighbor); } + List<Future<?>> futures = doAntiCompaction(parentSession); + FBUtilities.waitOnFutures(futures); } finally { @@ -407,7 +392,7 @@ public class ActiveRepairService this.repairedAt = repairedAt; } - public Collection<SSTableReader> getAndReferenceSSTables(UUID cfId) + public synchronized Collection<SSTableReader> getAndReferenceSSTables(UUID cfId) { Set<SSTableReader> sstables = sstableMap.get(cfId); Iterator<SSTableReader> sstableIterator = sstables.iterator();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5b5fa20..eb4c3e2 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -30,6 +30,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import javax.annotation.Nullable; import javax.management.JMX; import javax.management.MBeanServer; import javax.management.Notification; @@ -46,17 +47,13 @@ import ch.qos.logback.core.Appender; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.Uninterruptibles; - +import com.google.common.util.concurrent.*; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.DurationFormatUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.Auth; -import org.apache.cassandra.concurrent.DebuggableScheduledThreadPoolExecutor; -import org.apache.cassandra.concurrent.Stage; -import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.KSMetaData; @@ -80,8 +77,10 @@ import org.apache.cassandra.net.AsyncOneResponse; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ResponseVerbHandler; -import org.apache.cassandra.repair.RepairFuture; import org.apache.cassandra.repair.RepairMessageVerbHandler; +import org.apache.cassandra.repair.messages.RepairOption; +import org.apache.cassandra.repair.RepairResult; +import org.apache.cassandra.repair.RepairSession; import org.apache.cassandra.service.paxos.CommitVerbHandler; import org.apache.cassandra.service.paxos.PrepareVerbHandler; import org.apache.cassandra.service.paxos.ProposeVerbHandler; @@ -2497,87 +2496,128 @@ public class StorageService extends NotificationBroadcasterSupport implements IE sendNotification(jmxNotification); } - public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException + public int repairAsync(String keyspace, Map<String, String> repairSpec) { - Collection<Range<Token>> ranges; - if (primaryRange) + RepairOption option = RepairOption.parse(repairSpec, getPartitioner()); + // if ranges are not specified + if (option.getRanges().isEmpty()) { - // when repairing only primary range, neither dataCenters nor hosts can be set - if (dataCenters == null && hosts == null) - ranges = getPrimaryRanges(keyspace); - // except dataCenters only contain local DC (i.e. -local) - else if (dataCenters != null && dataCenters.size() == 1 && dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) - ranges = getPrimaryRangesWithinDC(keyspace); + if (option.isPrimaryRange()) + { + // when repairing only primary range, neither dataCenters nor hosts can be set + if (option.getDataCenters().isEmpty() && option.getHosts().isEmpty()) + option.getRanges().addAll(getPrimaryRanges(keyspace)); + // except dataCenters only contain local DC (i.e. -local) + else if (option.getDataCenters().size() == 1 && option.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter())) + option.getRanges().addAll(getPrimaryRangesWithinDC(keyspace)); + else + throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); + } else - throw new IllegalArgumentException("You need to run primary range repair on all nodes in the cluster."); - } - else - { - ranges = getLocalRanges(keyspace); + { + option.getRanges().addAll(getLocalRanges(keyspace)); + } } - - return forceRepairAsync(keyspace, isSequential, dataCenters, hosts, ranges, fullRepair, columnFamilies); + return forceRepairAsync(keyspace, option); } - public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, Collection<Range<Token>> ranges, boolean fullRepair, String... columnFamilies) + public int forceRepairAsync(String keyspace, + boolean isSequential, + Collection<String> dataCenters, + Collection<String> hosts, + boolean primaryRange, + boolean fullRepair, + String... columnFamilies) { - if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) - return 0; - - int cmd = nextRepairCommand.incrementAndGet(); - if (ranges.size() > 0) + if (!FBUtilities.isUnix() && isSequential) { - if (!FBUtilities.isUnix() && isSequential) - { - logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); - isSequential = false; - } - new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, hosts, fullRepair, columnFamilies)).start(); + logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); + isSequential = false; } - return cmd; - } - public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies) - { - Collection<Range<Token>> ranges; - if (primaryRange) + RepairOption options = new RepairOption(isSequential, primaryRange, !fullRepair, 1, Collections.<Range<Token>>emptyList()); + if (dataCenters != null) { - ranges = isLocal ? getPrimaryRangesWithinDC(keyspace) : getPrimaryRanges(keyspace); + options.getDataCenters().addAll(dataCenters); } - else + if (hosts != null) { - ranges = getLocalRanges(keyspace); + options.getHosts().addAll(hosts); } - - return forceRepairAsync(keyspace, isSequential, isLocal, ranges, fullRepair, columnFamilies); + if (columnFamilies != null) + { + for (String columnFamily : columnFamilies) + { + options.getColumnFamilies().add(columnFamily); + } + } + return forceRepairAsync(keyspace, options); } - public int forceRepairAsync(final String keyspace, final boolean isSequential, final boolean isLocal, final Collection<Range<Token>> ranges, final boolean fullRepair, final String... columnFamilies) + public int forceRepairAsync(String keyspace, + boolean isSequential, + boolean isLocal, + boolean primaryRange, + boolean fullRepair, + String... columnFamilies) { - if (ranges.isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) - return 0; - - int cmd = nextRepairCommand.incrementAndGet(); - new Thread(createRepairTask(cmd, keyspace, ranges, isSequential, isLocal, fullRepair, columnFamilies)).start(); - return cmd; + Set<String> dataCenters = null; + if (isLocal) + { + dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); + } + return forceRepairAsync(keyspace, isSequential, dataCenters, null, primaryRange, fullRepair, columnFamilies); } - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean fullRepair, String... columnFamilies) throws IOException + public int forceRepairRangeAsync(String beginToken, + String endToken, + String keyspaceName, + boolean isSequential, + Collection<String> dataCenters, + Collection<String> hosts, + boolean fullRepair, + String... columnFamilies) { + if (!FBUtilities.isUnix() && isSequential) + { + logger.warn("Snapshot-based repair is not yet supported on Windows. Reverting to parallel repair."); + isSequential = false; + } Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); + RepairOption options = new RepairOption(isSequential, false, !fullRepair, 1, repairingRange); + options.getDataCenters().addAll(dataCenters); + if (hosts != null) + { + options.getHosts().addAll(hosts); + } + if (columnFamilies != null) + { + for (String columnFamily : columnFamilies) + { + options.getColumnFamilies().add(columnFamily); + } + } + logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, dataCenters, hosts, repairingRange, fullRepair, columnFamilies); + return forceRepairAsync(keyspaceName, options); } - public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean fullRepair, String... columnFamilies) + public int forceRepairRangeAsync(String beginToken, + String endToken, + String keyspaceName, + boolean isSequential, + boolean isLocal, + boolean fullRepair, + String... columnFamilies) { - Collection<Range<Token>> repairingRange = createRepairRangeFrom(beginToken, endToken); - - logger.info("starting user-requested repair of range {} for keyspace {} and column families {}", - repairingRange, keyspaceName, columnFamilies); - return forceRepairAsync(keyspaceName, isSequential, isLocal, repairingRange, fullRepair, columnFamilies); + Set<String> dataCenters = null; + if (isLocal) + { + dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); + } + return forceRepairRangeAsync(beginToken, endToken, keyspaceName, isSequential, dataCenters, null, fullRepair, columnFamilies); } /** @@ -2619,32 +2659,19 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return repairingRange; } - private FutureTask<Object> createRepairTask(int cmd, - String keyspace, - Collection<Range<Token>> ranges, - boolean isSequential, - boolean isLocal, - boolean fullRepair, - String... columnFamilies) + public int forceRepairAsync(String keyspace, RepairOption options) { - Set<String> dataCenters = null; - if (isLocal) - { - dataCenters = Sets.newHashSet(DatabaseDescriptor.getLocalDataCenter()); - } - return createRepairTask(cmd, keyspace, ranges, isSequential, dataCenters, null, fullRepair, columnFamilies); + if (options.getRanges().isEmpty() || Keyspace.open(keyspace).getReplicationStrategy().getReplicationFactor() < 2) + return 0; + + int cmd = nextRepairCommand.incrementAndGet(); + new Thread(createRepairTask(cmd, keyspace, options)).start(); + return cmd; } - private FutureTask<Object> createRepairTask(final int cmd, - final String keyspace, - final Collection<Range<Token>> ranges, - final boolean isSequential, - final Collection<String> dataCenters, - final Collection<String> hosts, - final boolean fullRepair, - final String... columnFamilies) + private FutureTask<Object> createRepairTask(final int cmd, final String keyspace, final RepairOption options) { - if (dataCenters != null && !dataCenters.contains(DatabaseDescriptor.getLocalDataCenter())) + if (!options.getDataCenters().isEmpty() && options.getDataCenters().contains(DatabaseDescriptor.getLocalDataCenter())) { throw new IllegalArgumentException("the local data center must be part of the repair"); } @@ -2653,11 +2680,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { protected void runMayThrow() throws Exception { - String message = String.format("Starting repair command #%d, repairing %d ranges for keyspace %s (seq=%b, full=%b)", cmd, ranges.size(), keyspace, isSequential, fullRepair); + final long startTime = System.currentTimeMillis(); + String message = String.format("Starting repair command #%d, repairing keyspace %s with %s", cmd, keyspace, options); logger.info(message); sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.STARTED.ordinal()}); - if (isSequential && !fullRepair) + if (options.isSequential() && options.isIncremental()) { message = "It is not possible to mix sequential repair and incremental repairs."; logger.error(message); @@ -2665,13 +2693,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return; } - Set<InetAddress> allNeighbors = new HashSet<>(); + final Set<InetAddress> allNeighbors = new HashSet<>(); Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>(); - for (Range<Token> range : ranges) + for (Range<Token> range : options.getRanges()) { try { - Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, dataCenters, hosts); + Set<InetAddress> neighbors = ActiveRepairService.getNeighbors(keyspace, range, options.getDataCenters(), options.getHosts()); rangeToNeighbors.put(range, neighbors); allNeighbors.addAll(neighbors); } @@ -2685,6 +2713,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // Validate columnfamilies List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(); + String[] columnFamilies = options.getColumnFamilies().toArray(new String[options.getColumnFamilies().size()]); try { Iterables.addAll(columnFamilyStores, getValidColumnFamilies(false, false, keyspace, columnFamilies)); @@ -2695,12 +2724,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return; } - UUID parentSession = null; - if (!fullRepair) + final UUID parentSession; + long repairedAt = ActiveRepairService.UNREPAIRED_SSTABLE; + if (options.isIncremental()) { try { - parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores); + parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, options.getRanges(), columnFamilyStores); + repairedAt = ActiveRepairService.instance.getParentRepairSession(parentSession).repairedAt; } catch (Throwable t) { @@ -2708,60 +2739,93 @@ public class StorageService extends NotificationBroadcasterSupport implements IE return; } } + else + { + parentSession = null; + } - List<RepairFuture> futures = new ArrayList<>(ranges.size()); + // Set up RepairJob executor for this repair command. + final ListeningExecutorService executor = MoreExecutors.listeningDecorator(new JMXConfigurableThreadPoolExecutor(options.getJobThreads(), + Integer.MAX_VALUE, + TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>(), + new NamedThreadFactory("Repair#" + cmd), + "internal")); + + List<ListenableFuture<?>> futures = new ArrayList<>(options.getRanges().size()); String[] cfnames = new String[columnFamilyStores.size()]; for (int i = 0; i < columnFamilyStores.size(); i++) { cfnames[i] = columnFamilyStores.get(i).name; } - for (Range<Token> range : ranges) + for (Range<Token> range : options.getRanges()) { - RepairFuture future = ActiveRepairService.instance.submitRepairSession(parentSession, range, keyspace, isSequential, rangeToNeighbors.get(range), cfnames); - if (future == null) + final RepairSession session = ActiveRepairService.instance.submitRepairSession(parentSession, + range, + keyspace, + options.isSequential(), + rangeToNeighbors.get(range), + repairedAt, + executor, + cfnames); + if (session == null) continue; - futures.add(future); - // wait for a session to be done with its differencing before starting the next one - try + // After repair session completes, notify client its result + Futures.addCallback(session, new FutureCallback<List<RepairResult>>() { - future.session.differencingDone.await(); - } - catch (InterruptedException e) - { - message = "Interrupted while waiting for the differencing of repair session " + future.session + " to be done. Repair may be imprecise."; - logger.error(message, e); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); - } + public void onSuccess(List<RepairResult> results) + { + String message = String.format("Repair session %s for range %s finished", session.getId(), session.getRange().toString()); + logger.info(message); + sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()}); + } + + public void onFailure(Throwable t) + { + String message = String.format("Repair session %s for range %s failed with error %s", session.getId(), session.getRange().toString(), t.getMessage()); + logger.error(message, t); + sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); + } + }); + futures.add(session); } - boolean successful = true; - for (RepairFuture future : futures) + // After all repair sessions completes(successful or not), + // run anticompaction if necessary and send finish notice back to client + ListenableFuture<?> allSessions = Futures.allAsList(futures); + Futures.addCallback(allSessions, new FutureCallback<Object>() { - try + public void onSuccess(@Nullable Object result) { - future.get(); - message = String.format("Repair session %s for range %s finished", future.session.getId(), future.session.getRange().toString()); - logger.info(message); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_SUCCESS.ordinal()}); + if (options.isIncremental()) + { + try + { + ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors); + } + catch (Exception e) + { + logger.error("Error in incremental repair", e); + } + } + repairComplete(); } - catch (ExecutionException e) + + public void onFailure(Throwable t) { - successful = false; - message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getCause().getMessage()); - logger.error(message, e); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); + repairComplete(); } - catch (Exception e) + + private void repairComplete() { - successful = false; - message = String.format("Repair session %s for range %s failed with error %s", future.session.getId(), future.session.getRange().toString(), e.getMessage()); - logger.error(message, e); - sendNotification("repair", message, new int[]{cmd, ActiveRepairService.Status.SESSION_FAILED.ordinal()}); + String duration = DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime, true, true); + String message = String.format("Repair command #%d finished in %s", cmd, duration); + sendNotification("repair", message, + new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + logger.info(message); + executor.shutdownNow(); } - } - if (!fullRepair) - ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, successful); - sendNotification("repair", String.format("Repair command #%d finished", cmd), new int[]{cmd, ActiveRepairService.Status.FINISHED.ordinal()}); + }, MoreExecutors.sameThreadExecutor()); } }, null); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/service/StorageServiceMBean.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index cc54639..203d5dc 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -270,29 +270,22 @@ public interface StorageServiceMBean extends NotificationEmitter * type: "repair" * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status * + * @param keyspace Keyspace name to repair. Should not be null. + * @param options repair option. * @return Repair command number, or 0 if nothing to repair */ + public int repairAsync(String keyspace, Map<String, String> options); + + @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean repairedAt, String... columnFamilies) throws IOException; - /** - * Same as forceRepairAsync, but handles a specified range - */ + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean repairedAt, String... columnFamilies) throws IOException; - /** - * Invoke repair asynchronously. - * You can track repair progress by subscribing JMX notification sent from this StorageServiceMBean. - * Notification format is: - * type: "repair" - * userObject: int array of length 2, [0]=command number, [1]=ordinal of AntiEntropyService.Status - * - * @return Repair command number, or 0 if nothing to repair - */ + @Deprecated public int forceRepairAsync(String keyspace, boolean isSequential, boolean isLocal, boolean primaryRange, boolean fullRepair, String... columnFamilies); - /** - * Same as forceRepairAsync, but handles a specified range - */ + @Deprecated public int forceRepairRangeAsync(String beginToken, String endToken, String keyspaceName, boolean isSequential, boolean isLocal, boolean repairedAt, String... columnFamilies); public void forceTerminateAllRepairSessions(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java index 33da3d1..f26e439 100644 --- a/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java +++ b/src/java/org/apache/cassandra/streaming/StreamReceiveTask.java @@ -21,16 +21,15 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; -import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; +import org.apache.cassandra.concurrent.NamedThreadFactory; import org.apache.cassandra.config.Schema; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.io.sstable.SSTableReader; import org.apache.cassandra.io.sstable.SSTableWriter; -import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -38,9 +37,7 @@ import org.apache.cassandra.utils.Pair; */ public class StreamReceiveTask extends StreamTask { - private static final ThreadPoolExecutor executor = DebuggableThreadPoolExecutor.createWithMaximumPoolSize("StreamReceiveTask", - FBUtilities.getAvailableProcessors(), - 60, TimeUnit.SECONDS); + private static final ExecutorService executor = Executors.newCachedThreadPool(new NamedThreadFactory("StreamReceiveTask")); // number of files to receive private final int totalFiles; http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/NodeProbe.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 70788fd..8793e92 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -43,6 +43,7 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Sets; +import com.google.common.util.concurrent.AbstractFuture; import com.yammer.metrics.reporting.JmxReporter; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.db.ColumnFamilyStoreMBean; @@ -249,43 +250,15 @@ public class NodeProbe implements AutoCloseable ssProxy.forceKeyspaceFlush(keyspaceName, columnFamilies); } - public void forceRepairAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRange, boolean fullRepair, String... columnFamilies) throws IOException + public void repairAsync(final PrintStream out, final String keyspace, Map<String, String> options) throws IOException { - RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); + RepairRunner runner = new RepairRunner(out, ssProxy, keyspace, options); try { jmxc.addConnectionNotificationListener(runner, null, null); ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairAndWait(ssProxy, isSequential, dataCenters, hosts, primaryRange, fullRepair)) - failed = true; - } - catch (Exception e) - { - throw new IOException(e) ; - } - finally - { - try - { - ssProxy.removeNotificationListener(runner); - jmxc.removeConnectionNotificationListener(runner); - } - catch (Throwable t) - { - JVMStabilityInspector.inspectThrowable(t); - out.println("Exception occurred during clean-up. " + t); - } - } - } - - public void forceRepairRangeAsync(final PrintStream out, final String keyspaceName, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, final String startToken, final String endToken, boolean fullRepair, String... columnFamilies) throws IOException - { - RepairRunner runner = new RepairRunner(out, keyspaceName, columnFamilies); - try - { - jmxc.addConnectionNotificationListener(runner, null, null); - ssProxy.addNotificationListener(runner, null, null); - if (!runner.repairRangeAndWait(ssProxy, isSequential, dataCenters, hosts, startToken, endToken, fullRepair)) + runner.run(); + if (!runner.get()) failed = true; } catch (Exception e) @@ -1273,88 +1246,3 @@ class ThreadPoolProxyMBeanIterator implements Iterator<Map.Entry<String, JMXEnab throw new UnsupportedOperationException(); } } - -class RepairRunner implements NotificationListener -{ - private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); - private final Condition condition = new SimpleCondition(); - private final PrintStream out; - private final String keyspace; - private final String[] columnFamilies; - private int cmd; - private volatile boolean success = true; - private volatile Exception error = null; - - RepairRunner(PrintStream out, String keyspace, String... columnFamilies) - { - this.out = out; - this.keyspace = keyspace; - this.columnFamilies = columnFamilies; - } - - public boolean repairAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, boolean primaryRangeOnly, boolean fullRepair) throws Exception - { - cmd = ssProxy.forceRepairAsync(keyspace, isSequential, dataCenters, hosts, primaryRangeOnly, fullRepair, columnFamilies); - waitForRepair(); - return success; - } - - public boolean repairRangeAndWait(StorageServiceMBean ssProxy, boolean isSequential, Collection<String> dataCenters, Collection<String> hosts, String startToken, String endToken, boolean fullRepair) throws Exception - { - cmd = ssProxy.forceRepairRangeAsync(startToken, endToken, keyspace, isSequential, dataCenters, hosts, fullRepair, columnFamilies); - waitForRepair(); - return success; - } - - private void waitForRepair() throws Exception - { - if (cmd > 0) - { - condition.await(); - } - else - { - String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); - out.println(message); - } - if (error != null) - { - throw error; - } - } - - public void handleNotification(Notification notification, Object handback) - { - if ("repair".equals(notification.getType())) - { - int[] status = (int[]) notification.getUserData(); - assert status.length == 2; - if (cmd == status[0]) - { - String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage()); - out.println(message); - // repair status is int array with [0] = cmd number, [1] = status - if (status[1] == ActiveRepairService.Status.SESSION_FAILED.ordinal()) - success = false; - else if (status[1] == ActiveRepairService.Status.FINISHED.ordinal()) - condition.signalAll(); - } - } - else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType())) - { - String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s", - format.format(notification.getTimeStamp()), - keyspace); - out.println(message); - } - else if (JMXConnectionNotification.FAILED.equals(notification.getType()) - || JMXConnectionNotification.CLOSED.equals(notification.getType())) - { - String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s" - + "(Subsequent keyspaces are not going to be repaired).", - keyspace); - error = new IOException(message); - condition.signalAll(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/NodeTool.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 80b0b8f..18536bf 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -37,6 +37,7 @@ import com.google.common.collect.Maps; import com.yammer.metrics.reporting.JmxReporter; import io.airlift.command.*; +import org.apache.commons.lang3.StringUtils; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutorMBean; import org.apache.cassandra.config.Schema; @@ -48,6 +49,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.EndpointSnitchInfoMBean; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.net.MessagingServiceMBean; +import org.apache.cassandra.repair.messages.RepairOption; import org.apache.cassandra.service.CacheServiceMBean; import org.apache.cassandra.streaming.ProgressInfo; import org.apache.cassandra.streaming.SessionInfo; @@ -1677,6 +1679,11 @@ public class NodeTool @Option(title = "full", name = {"-full", "--full"}, description = "Use -full to issue a full repair.") private boolean fullRepair = false; + @Option(title = "job_threads", name = {"-j", "--job-threads"}, description = "Number of threads to run repair jobs. " + + "Usually this means number of CFs to repair concurrently. " + + "WARNING: increasing this puts more load on repairing nodes, so be careful. (default: 1, max: 4)") + private int numJobThreads = 1; + @Override public void execute(NodeProbe probe) { @@ -1688,20 +1695,28 @@ public class NodeTool for (String keyspace : keyspaces) { + Map<String, String> options = new HashMap<>(); + options.put(RepairOption.SEQUENTIAL_KEY, Boolean.toString(sequential)); + options.put(RepairOption.PRIMARY_RANGE_KEY, Boolean.toString(primaryRange)); + options.put(RepairOption.INCREMENTAL_KEY, Boolean.toString(!fullRepair)); + options.put(RepairOption.JOB_THREADS_KEY, Integer.toString(numJobThreads)); + options.put(RepairOption.COLUMNFAMILIES_KEY, StringUtils.join(cfnames, ",")); + if (!startToken.isEmpty() || !endToken.isEmpty()) + { + options.put(RepairOption.RANGES_KEY, startToken + ":" + endToken); + } + if (localDC) + { + options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(newArrayList(probe.getDataCenter()), ",")); + } + else + { + options.put(RepairOption.DATACENTERS_KEY, StringUtils.join(specificDataCenters, ",")); + } + options.put(RepairOption.HOSTS_KEY, StringUtils.join(specificHosts, ",")); try { - Collection<String> dataCenters = null; - Collection<String> hosts = null; - if (!specificDataCenters.isEmpty()) - dataCenters = newArrayList(specificDataCenters); - else if (localDC) - dataCenters = newArrayList(probe.getDataCenter()); - else if(!specificHosts.isEmpty()) - hosts = newArrayList(specificHosts); - if (!startToken.isEmpty() || !endToken.isEmpty()) - probe.forceRepairRangeAsync(System.out, keyspace, sequential, dataCenters,hosts, startToken, endToken, fullRepair); - else - probe.forceRepairAsync(System.out, keyspace, sequential, dataCenters, hosts, primaryRange, fullRepair, cfnames); + probe.repairAsync(System.out, keyspace, options); } catch (Exception e) { throw new RuntimeException("Error occurred during repair", e); http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/src/java/org/apache/cassandra/tools/RepairRunner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/tools/RepairRunner.java b/src/java/org/apache/cassandra/tools/RepairRunner.java new file mode 100644 index 0000000..1898bb4 --- /dev/null +++ b/src/java/org/apache/cassandra/tools/RepairRunner.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.tools; + +import java.io.IOException; +import java.io.PrintStream; +import java.text.SimpleDateFormat; +import java.util.Map; +import javax.management.Notification; +import javax.management.NotificationListener; +import javax.management.remote.JMXConnectionNotification; + +import com.google.common.util.concurrent.AbstractFuture; + +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.service.StorageServiceMBean; + +public class RepairRunner extends AbstractFuture<Boolean> implements Runnable, NotificationListener +{ + private final SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS"); + + private final PrintStream out; + private final StorageServiceMBean ssProxy; + private final String keyspace; + private final Map<String, String> options; + + private volatile int cmd; + private volatile boolean success; + + public RepairRunner(PrintStream out, StorageServiceMBean ssProxy, String keyspace, Map<String, String> options) + { + this.out = out; + this.ssProxy = ssProxy; + this.keyspace = keyspace; + this.options = options; + } + + public void run() + { + cmd = ssProxy.repairAsync(keyspace, options); + if (cmd <= 0) + { + String message = String.format("[%s] Nothing to repair for keyspace '%s'", format.format(System.currentTimeMillis()), keyspace); + out.println(message); + set(true); + } + } + + public void handleNotification(Notification notification, Object handback) + { + if ("repair".equals(notification.getType())) + { + int[] status = (int[]) notification.getUserData(); + assert status.length == 2; + if (cmd == status[0]) + { + String message = String.format("[%s] %s", format.format(notification.getTimeStamp()), notification.getMessage()); + out.println(message); + // repair status is int array with [0] = cmd number, [1] = status + if (status[1] == ActiveRepairService.Status.SESSION_FAILED.ordinal()) + { + success = false; + } + else if (status[1] == ActiveRepairService.Status.FINISHED.ordinal()) + { + set(success); + } + } + } + else if (JMXConnectionNotification.NOTIFS_LOST.equals(notification.getType())) + { + String message = String.format("[%s] Lost notification. You should check server log for repair status of keyspace %s", + format.format(notification.getTimeStamp()), + keyspace); + out.println(message); + } + else if (JMXConnectionNotification.FAILED.equals(notification.getType()) + || JMXConnectionNotification.CLOSED.equals(notification.getType())) + { + String message = String.format("JMX connection closed. You should check server log for repair status of keyspace %s" + + "(Subsequent keyspaces are not going to be repaired).", + keyspace); + setException(new IOException(message)); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/DifferencerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java deleted file mode 100644 index e1ff26e..0000000 --- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.repair; - -import java.net.InetAddress; -import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; -import java.util.UUID; - -import org.junit.After; -import org.junit.BeforeClass; -import org.junit.Test; - -import org.apache.cassandra.SchemaLoader; -import org.apache.cassandra.config.KSMetaData; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.dht.Murmur3Partitioner; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.SimpleStrategy; -import org.apache.cassandra.net.MessageIn; -import org.apache.cassandra.net.MessageOut; -import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.service.ActiveRepairService; -import org.apache.cassandra.sink.IMessageSink; -import org.apache.cassandra.sink.SinkManager; -import org.apache.cassandra.repair.messages.RepairMessage; -import org.apache.cassandra.repair.messages.SyncComplete; -import org.apache.cassandra.utils.MerkleTree; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -public class DifferencerTest -{ - private static final IPartitioner partirioner = new Murmur3Partitioner(); - public static final String KEYSPACE1 = "DifferencerTest"; - public static final String CF_STANDARD = "Standard1"; - - @BeforeClass - public static void defineSchema() throws Exception - { - SchemaLoader.prepareServer(); - SchemaLoader.createKeyspace(KEYSPACE1, - SimpleStrategy.class, - KSMetaData.optsWithRF(1), - SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); - } - - @After - public void tearDown() - { - SinkManager.clear(); - } - - /** - * When there is no difference between two, Differencer should respond SYNC_COMPLETE - */ - @Test - public void testNoDifference() throws Throwable - { - final InetAddress ep1 = InetAddress.getByName("127.0.0.1"); - final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); - - SinkManager.add(new IMessageSink() - { - @SuppressWarnings("unchecked") - public MessageOut handleMessage(MessageOut message, int id, InetAddress to) - { - if (message.verb == MessagingService.Verb.REPAIR_MESSAGE) - { - RepairMessage m = (RepairMessage) message.payload; - assertEquals(RepairMessage.Type.SYNC_COMPLETE, m.messageType); - // we should see SYNC_COMPLETE - assertEquals(new NodePair(ep1, ep2), ((SyncComplete)m).nodes); - } - return null; - } - - public MessageIn handleMessage(MessageIn message, int id, InetAddress to) - { - return null; - } - }); - Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); - RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range); - - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(ep1, tree1); - TreeResponse r2 = new TreeResponse(ep2, tree2); - Differencer diff = new Differencer(desc, r1, r2); - diff.run(); - - assertTrue(diff.differences.isEmpty()); - } - - @Test - public void testDifference() throws Throwable - { - Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); - UUID parentRepairSession = UUID.randomUUID(); - Keyspace keyspace = Keyspace.open(KEYSPACE1); - ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); - - ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range)); - - RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); - - MerkleTree tree1 = createInitialTree(desc); - MerkleTree tree2 = createInitialTree(desc); - - // change a range in one of the trees - Token token = partirioner.midpoint(range.left, range.right); - tree1.invalidate(token); - MerkleTree.TreeRange changed = tree1.get(token); - changed.hash("non-empty hash!".getBytes()); - - Set<Range<Token>> interesting = new HashSet<>(); - interesting.add(changed); - - // difference the trees - // note: we reuse the same endpoint which is bogus in theory but fine here - TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); - TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); - Differencer diff = new Differencer(desc, r1, r2); - diff.run(); - - // ensure that the changed range was recorded - assertEquals("Wrong differing ranges", interesting, new HashSet<>(diff.differences)); - } - - private MerkleTree createInitialTree(RepairJobDesc desc) - { - MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)); - tree.init(); - for (MerkleTree.TreeRange r : tree.invalids()) - { - r.ensureHashInitialised(); - } - return tree; - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java new file mode 100644 index 0000000..b3d333a --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/LocalSyncTaskTest.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.net.InetAddress; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; + +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.MerkleTree; + +import static org.junit.Assert.assertEquals; + +public class LocalSyncTaskTest extends SchemaLoader +{ + private static final IPartitioner partirioner = new Murmur3Partitioner(); + public static final String KEYSPACE1 = "DifferencerTest"; + public static final String CF_STANDARD = "Standard1"; + + @BeforeClass + public static void defineSchema() throws Exception + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE1, + SimpleStrategy.class, + KSMetaData.optsWithRF(1), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD)); + } + + /** + * When there is no difference between two, LocalSyncTask should return stats with 0 difference. + */ + @Test + public void testNoDifference() throws Throwable + { + final InetAddress ep1 = InetAddress.getByName("127.0.0.1"); + final InetAddress ep2 = InetAddress.getByName("127.0.0.1"); + + Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + RepairJobDesc desc = new RepairJobDesc(UUID.randomUUID(), UUID.randomUUID(), KEYSPACE1, "Standard1", range); + + MerkleTree tree1 = createInitialTree(desc); + MerkleTree tree2 = createInitialTree(desc); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(ep1, tree1); + TreeResponse r2 = new TreeResponse(ep2, tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + task.run(); + + assertEquals(0, task.get().numberOfDifferences); + } + + @Test + public void testDifference() throws Throwable + { + Range<Token> range = new Range<>(partirioner.getMinimumToken(), partirioner.getRandomToken()); + UUID parentRepairSession = UUID.randomUUID(); + Keyspace keyspace = Keyspace.open(KEYSPACE1); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1"); + + ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range)); + + RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), KEYSPACE1, "Standard1", range); + + MerkleTree tree1 = createInitialTree(desc); + MerkleTree tree2 = createInitialTree(desc); + + // change a range in one of the trees + Token token = partirioner.midpoint(range.left, range.right); + tree1.invalidate(token); + MerkleTree.TreeRange changed = tree1.get(token); + changed.hash("non-empty hash!".getBytes()); + + Set<Range<Token>> interesting = new HashSet<>(); + interesting.add(changed); + + // difference the trees + // note: we reuse the same endpoint which is bogus in theory but fine here + TreeResponse r1 = new TreeResponse(InetAddress.getByName("127.0.0.1"), tree1); + TreeResponse r2 = new TreeResponse(InetAddress.getByName("127.0.0.2"), tree2); + LocalSyncTask task = new LocalSyncTask(desc, r1, r2, ActiveRepairService.UNREPAIRED_SSTABLE); + task.run(); + + // ensure that the changed range was recorded + assertEquals("Wrong differing ranges", interesting.size(), task.getCurrentStat().numberOfDifferences); + } + + private MerkleTree createInitialTree(RepairJobDesc desc) + { + MerkleTree tree = new MerkleTree(partirioner, desc.range, MerkleTree.RECOMMENDED_DEPTH, (int)Math.pow(2, 15)); + tree.init(); + for (MerkleTree.TreeRange r : tree.invalids()) + { + r.ensureHashInitialised(); + } + return tree; + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/RepairSessionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/RepairSessionTest.java b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java new file mode 100644 index 0000000..9811fcc --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/RepairSessionTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair; + +import java.io.IOException; +import java.net.InetAddress; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ExecutionException; + +import com.google.common.collect.Sets; +import org.junit.Test; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.service.ActiveRepairService; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.UUIDGen; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +public class RepairSessionTest +{ + @Test + public void testConviction() throws Exception + { + InetAddress remote = InetAddress.getByName("127.0.0.2"); + Gossiper.instance.initializeNodeUnsafe(remote, UUID.randomUUID(), 1); + + // Set up RepairSession + UUID parentSessionId = UUIDGen.getTimeUUID(); + UUID sessionId = UUID.randomUUID(); + IPartitioner p = new Murmur3Partitioner(); + Range<Token> repairRange = new Range<>(p.getToken(ByteBufferUtil.bytes(0)), p.getToken(ByteBufferUtil.bytes(100)), p); + Set<InetAddress> endpoints = Sets.newHashSet(remote); + RepairSession session = new RepairSession(parentSessionId, sessionId, repairRange, "Keyspace1", true, endpoints, ActiveRepairService.UNREPAIRED_SSTABLE, "Standard1"); + + // perform convict + session.convict(remote, Double.MAX_VALUE); + + // RepairSession should throw ExecutorException with the cause of IOException when getting its value + try + { + session.get(); + fail(); + } + catch (ExecutionException ex) + { + assertEquals(IOException.class, ex.getCause().getClass()); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java new file mode 100644 index 0000000..59ad8a3 --- /dev/null +++ b/test/unit/org/apache/cassandra/repair/messages/RepairOptionTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.repair.messages; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +import com.google.common.collect.Iterables; +import org.junit.Test; + +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import static org.junit.Assert.*; + +public class RepairOptionTest +{ + @Test + public void testParseOptions() + { + IPartitioner partitioner = new Murmur3Partitioner(); + Token.TokenFactory tokenFactory = partitioner.getTokenFactory(); + + // parse with empty options + RepairOption option = RepairOption.parse(new HashMap<String, String>(), partitioner); + assertTrue(option.isSequential()); + assertFalse(option.isPrimaryRange()); + assertFalse(option.isIncremental()); + + // parse everything + Map<String, String> options = new HashMap<>(); + options.put(RepairOption.SEQUENTIAL_KEY, "false"); + options.put(RepairOption.PRIMARY_RANGE_KEY, "false"); + options.put(RepairOption.INCREMENTAL_KEY, "true"); + options.put(RepairOption.RANGES_KEY, "0:10,11:20,21:30"); + options.put(RepairOption.COLUMNFAMILIES_KEY, "cf1,cf2,cf3"); + options.put(RepairOption.DATACENTERS_KEY, "dc1,dc2,dc3"); + options.put(RepairOption.HOSTS_KEY, "127.0.0.1,127.0.0.2,127.0.0.3"); + + option = RepairOption.parse(options, partitioner); + assertFalse(option.isSequential()); + assertFalse(option.isPrimaryRange()); + assertTrue(option.isIncremental()); + + Set<Range<Token>> expectedRanges = new HashSet<>(3); + expectedRanges.add(new Range<>(tokenFactory.fromString("0"), tokenFactory.fromString("10"))); + expectedRanges.add(new Range<>(tokenFactory.fromString("11"), tokenFactory.fromString("20"))); + expectedRanges.add(new Range<>(tokenFactory.fromString("21"), tokenFactory.fromString("30"))); + assertEquals(expectedRanges, option.getRanges()); + + Set<String> expectedCFs = new HashSet<>(3); + expectedCFs.add("cf1"); + expectedCFs.add("cf2"); + expectedCFs.add("cf3"); + assertEquals(expectedCFs, option.getColumnFamilies()); + + Set<String> expectedDCs = new HashSet<>(3); + expectedDCs.add("dc1"); + expectedDCs.add("dc2"); + expectedDCs.add("dc3"); + assertEquals(expectedDCs, option.getDataCenters()); + + Set<String> expectedHosts = new HashSet<>(3); + expectedHosts.add("127.0.0.1"); + expectedHosts.add("127.0.0.2"); + expectedHosts.add("127.0.0.3"); + assertEquals(expectedHosts, option.getHosts()); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/810c2d5f/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java new file mode 100644 index 0000000..dab45f9 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java @@ -0,0 +1,218 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +*/ +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.*; + +import com.google.common.collect.Sets; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.config.KSMetaData; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.locator.SimpleStrategy; +import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.utils.FBUtilities; + +import static org.junit.Assert.assertEquals; + +public class ActiveRepairServiceTest +{ + public static final String KEYSPACE5 = "Keyspace5"; + public static final String CF_STANDRAD1 = "Standard1"; + public static final String CF_COUNTER = "Counter1"; + + public String cfname; + public ColumnFamilyStore store; + public InetAddress LOCAL, REMOTE; + + private boolean initialized; + + @BeforeClass + public static void defineSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + SchemaLoader.createKeyspace(KEYSPACE5, + SimpleStrategy.class, + KSMetaData.optsWithRF(2), + SchemaLoader.standardCFMD(KEYSPACE5, CF_COUNTER), + SchemaLoader.standardCFMD(KEYSPACE5, CF_STANDRAD1)); + } + + @Before + public void prepare() throws Exception + { + if (!initialized) + { + SchemaLoader.startGossiper(); + initialized = true; + + LOCAL = FBUtilities.getBroadcastAddress(); + // generate a fake endpoint for which we can spoof receiving/sending trees + REMOTE = InetAddress.getByName("127.0.0.2"); + } + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.clearUnsafe(); + StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken())); + tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), REMOTE); + assert tmd.isMember(REMOTE); + } + + @Test + public void testGetNeighborsPlusOne() throws Throwable + { + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwo() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, null, null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsPlusOneInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf+1 nodes, and ensure that all nodes are returned + Set<InetAddress> expected = addTokens(1 + Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInLocalDC() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the ARS are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + Set<InetAddress> expected = new HashSet<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + expected.remove(FBUtilities.getBroadcastAddress()); + // remove remote endpoints + TokenMetadata.Topology topology = tmd.cloneOnlyTokenMap().getTopology(); + HashSet<InetAddress> localEndpoints = Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter())); + expected = Sets.intersection(expected, localEndpoints); + + Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(KEYSPACE5); + Set<InetAddress> neighbors = new HashSet<>(); + for (Range<Token> range : ranges) + { + neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null)); + } + assertEquals(expected, neighbors); + } + + @Test + public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + + // generate rf*2 nodes, and ensure that only neighbors specified by the hosts are returned + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + AbstractReplicationStrategy ars = Keyspace.open(KEYSPACE5).getReplicationStrategy(); + List<InetAddress> expected = new ArrayList<>(); + for (Range<Token> replicaRange : ars.getAddressRanges().get(FBUtilities.getBroadcastAddress())) + { + expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange)); + } + + expected.remove(FBUtilities.getBroadcastAddress()); + Collection<String> hosts = Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName()); + + assertEquals(expected.get(0), ActiveRepairService.getNeighbors(KEYSPACE5, + StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), + null, hosts).iterator().next()); + } + + @Test(expected = IllegalArgumentException.class) + public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws Throwable + { + addTokens(2 * Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor()); + //Dont give local endpoint + Collection<String> hosts = Arrays.asList("127.0.0.3"); + ActiveRepairService.getNeighbors(KEYSPACE5, StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, hosts); + } + + Set<InetAddress> addTokens(int max) throws Throwable + { + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + Set<InetAddress> endpoints = new HashSet<>(); + for (int i = 1; i <= max; i++) + { + InetAddress endpoint = InetAddress.getByName("127.0.0." + i); + tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), endpoint); + endpoints.add(endpoint); + } + return endpoints; + } +}
