Merge branch 'cassandra-2.1' into cassandra-2.2

Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/c2566d1c
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/c2566d1c
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/c2566d1c

Branch: refs/heads/cassandra-2.2
Commit: c2566d1cf9e239063a530ec08e8e098feffe387b
Parents: 52a827b 2f74831
Author: Robert Stupp <[email protected]>
Authored: Wed Jun 15 11:44:57 2016 +0200
Committer: Robert Stupp <[email protected]>
Committed: Wed Jun 15 11:44:57 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../apache/cassandra/repair/RepairRunnable.java   |  9 +++++++--
 .../cassandra/service/ActiveRepairService.java    |  5 +++--
 .../service/ActiveRepairServiceTest.java          | 18 ++++++++++--------
 4 files changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/CHANGES.txt
----------------------------------------------------------------------
diff --cc CHANGES.txt
index d0ca37f,ec2b48e..d9afaa3
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,30 -1,5 +1,31 @@@
 -2.1.15
 +2.2.7
 + * StorageService shutdown hook should use a volatile variable 
(CASSANDRA-11984)
 + * Persist local metadata earlier in startup sequence (CASSANDRA-11742)
 + * Run CommitLog tests with different compression settings (CASSANDRA-9039)
 + * cqlsh: fix tab completion for case-sensitive identifiers (CASSANDRA-11664)
 + * Avoid showing estimated key as -1 in tablestats (CASSANDRA-11587)
 + * Fix possible race condition in CommitLog.recover (CASSANDRA-11743)
 + * Enable client encryption in sstableloader with cli options 
(CASSANDRA-11708)
 + * Possible memory leak in NIODataInputStream (CASSANDRA-11867)
 + * Fix commit log replay after out-of-order flush completion (CASSANDRA-9669)
 + * Add seconds to cqlsh tracing session duration (CASSANDRA-11753)
 + * Prohibit Reverse Counter type as part of the PK (CASSANDRA-9395)
 + * cqlsh: correctly handle non-ascii chars in error messages (CASSANDRA-11626)
 + * Exit JVM if JMX server fails to startup (CASSANDRA-11540)
 + * Produce a heap dump when exiting on OOM (CASSANDRA-9861)
 + * Avoid read repairing purgeable tombstones on range slices (CASSANDRA-11427)
 + * Restore ability to filter on clustering columns when using a 2i 
(CASSANDRA-11510)
 + * JSON datetime formatting needs timezone (CASSANDRA-11137)
 + * Fix is_dense recalculation for Thrift-updated tables (CASSANDRA-11502)
 + * Remove unnescessary file existence check during anticompaction 
(CASSANDRA-11660)
 + * Add missing files to debian packages (CASSANDRA-11642)
 + * Avoid calling Iterables::concat in loops during 
ModificationStatement::getFunctions (CASSANDRA-11621)
 + * cqlsh: COPY FROM should use regular inserts for single statement batches 
and
 +   report errors correctly if workers processes crash on initialization 
(CASSANDRA-11474)
 + * Always close cluster with connection in CqlRecordWriter (CASSANDRA-11553)
 + * Fix slice queries on ordered COMPACT tables (CASSANDRA-10988)
 +Merged from 2.1:
+  * Cache local ranges when calculating repair neighbors (CASSANDRA-11933)
   * Allow LWT operation on static column with only partition keys 
(CASSANDRA-10532)
   * Create interval tree over canonical sstables to avoid missing sstables 
during streaming (CASSANDRA-11886)
   * cqlsh COPY FROM: shutdown parent cluster after forking, to avoid 
corrupting SSL connections (CASSANDRA-11749)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/repair/RepairRunnable.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/repair/RepairRunnable.java
index b849cf8,0000000..f92310b
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/repair/RepairRunnable.java
+++ b/src/java/org/apache/cassandra/repair/RepairRunnable.java
@@@ -1,409 -1,0 +1,414 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.repair;
 +
 +import java.net.InetAddress;
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.concurrent.LinkedBlockingQueue;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicInteger;
 +
 +import com.google.common.collect.ImmutableMap;
 +import com.google.common.collect.Iterables;
 +import com.google.common.collect.Lists;
 +import com.google.common.util.concurrent.*;
 +import org.apache.commons.lang3.time.DurationFormatUtils;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 +import org.apache.cassandra.concurrent.NamedThreadFactory;
 +import org.apache.cassandra.cql3.QueryOptions;
 +import org.apache.cassandra.cql3.QueryProcessor;
 +import org.apache.cassandra.cql3.UntypedResultSet;
 +import org.apache.cassandra.cql3.statements.SelectStatement;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.repair.messages.RepairOption;
 +import org.apache.cassandra.service.ActiveRepairService;
 +import org.apache.cassandra.service.QueryState;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tracing.TraceKeyspace;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +import org.apache.cassandra.transport.messages.ResultMessage;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +import org.apache.cassandra.utils.UUIDGen;
 +import org.apache.cassandra.utils.WrappedRunnable;
 +import org.apache.cassandra.utils.progress.ProgressEvent;
 +import org.apache.cassandra.utils.progress.ProgressEventNotifier;
 +import org.apache.cassandra.utils.progress.ProgressEventType;
 +import org.apache.cassandra.utils.progress.ProgressListener;
 +
 +public class RepairRunnable extends WrappedRunnable implements 
ProgressEventNotifier
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(RepairRunnable.class);
 +
 +    private StorageService storageService;
 +    private final int cmd;
 +    private final RepairOption options;
 +    private final String keyspace;
 +
 +    private final List<ProgressListener> listeners = new ArrayList<>();
 +
 +    public RepairRunnable(StorageService storageService, int cmd, 
RepairOption options, String keyspace)
 +    {
 +        this.storageService = storageService;
 +        this.cmd = cmd;
 +        this.options = options;
 +        this.keyspace = keyspace;
 +    }
 +
 +    @Override
 +    public void addProgressListener(ProgressListener listener)
 +    {
 +        listeners.add(listener);
 +    }
 +
 +    @Override
 +    public void removeProgressListener(ProgressListener listener)
 +    {
 +        listeners.remove(listener);
 +    }
 +
 +    protected void fireProgressEvent(String tag, ProgressEvent event)
 +    {
 +        for (ProgressListener listener : listeners)
 +        {
 +            listener.progress(tag, event);
 +        }
 +    }
 +
 +    protected void fireErrorAndComplete(String tag, int progressCount, int 
totalProgress, String message)
 +    {
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.ERROR, 
progressCount, totalProgress, message));
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.COMPLETE, 
progressCount, totalProgress));
 +    }
 +
 +    protected void runMayThrow() throws Exception
 +    {
 +        final TraceState traceState;
 +
 +        final String tag = "repair:" + cmd;
 +
 +        final AtomicInteger progress = new AtomicInteger();
 +        final int totalProgress = 3 + options.getRanges().size(); // 
calculate neighbors, validation, prepare for repair + number of ranges to repair
 +
 +        String[] columnFamilies = options.getColumnFamilies().toArray(new 
String[options.getColumnFamilies().size()]);
 +        Iterable<ColumnFamilyStore> validColumnFamilies = 
storageService.getValidColumnFamilies(false, false, keyspace,
 +                                                                              
                  columnFamilies);
 +
 +        final long startTime = System.currentTimeMillis();
 +        String message = String.format("Starting repair command #%d, 
repairing keyspace %s with %s", cmd, keyspace,
 +                                       options);
 +        logger.info(message);
 +        fireProgressEvent(tag, new ProgressEvent(ProgressEventType.START, 0, 
100, message));
 +        if (options.isTraced())
 +        {
 +            StringBuilder cfsb = new StringBuilder();
 +            for (ColumnFamilyStore cfs : validColumnFamilies)
 +                cfsb.append(", 
").append(cfs.keyspace.getName()).append(".").append(cfs.name);
 +
 +            UUID sessionId = 
Tracing.instance.newSession(Tracing.TraceType.REPAIR);
 +            traceState = Tracing.instance.begin("repair", 
ImmutableMap.of("keyspace", keyspace, "columnFamilies",
 +                                                                          
cfsb.substring(2)));
 +            Tracing.traceRepair(message);
 +            traceState.enableActivityNotification(tag);
 +            for (ProgressListener listener : listeners)
 +                traceState.addProgressListener(listener);
 +            Thread queryThread = createQueryThread(cmd, sessionId);
 +            queryThread.setName("RepairTracePolling");
 +            queryThread.start();
 +        }
 +        else
 +        {
 +            traceState = null;
 +        }
 +
 +        final Set<InetAddress> allNeighbors = new HashSet<>();
 +        Map<Range, Set<InetAddress>> rangeToNeighbors = new HashMap<>();
++
++        //pre-calculate output of getLocalRanges and pass it to getNeighbors 
to increase performance and prevent
++        //calculation multiple times
++        Collection<Range<Token>> keyspaceLocalRanges = 
storageService.getLocalRanges(keyspace);
++
 +        try
 +        {
 +            for (Range<Token> range : options.getRanges())
 +            {
-                     Set<InetAddress> neighbors = 
ActiveRepairService.getNeighbors(keyspace, range,
-                                                                               
    options.getDataCenters(),
++                    Set<InetAddress> neighbors = 
ActiveRepairService.getNeighbors(keyspace, keyspaceLocalRanges,
++                                                                              
    range, options.getDataCenters(),
 +                                                                              
    options.getHosts());
 +                    rangeToNeighbors.put(range, neighbors);
 +                    allNeighbors.addAll(neighbors);
 +            }
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            logger.error("Repair failed:", e);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
 +            return;
 +        }
 +
 +        // Validate columnfamilies
 +        List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>();
 +        try
 +        {
 +            Iterables.addAll(columnFamilyStores, validColumnFamilies);
 +            progress.incrementAndGet();
 +        }
 +        catch (IllegalArgumentException e)
 +        {
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, 
e.getMessage());
 +            return;
 +        }
 +
 +        String[] cfnames = new String[columnFamilyStores.size()];
 +        for (int i = 0; i < columnFamilyStores.size(); i++)
 +        {
 +            cfnames[i] = columnFamilyStores.get(i).name;
 +        }
 +
 +        final UUID parentSession = UUIDGen.getTimeUUID();
 +        SystemDistributedKeyspace.startParentRepair(parentSession, keyspace, 
cfnames, options.getRanges());
 +        long repairedAt;
 +        try
 +        {
 +            ActiveRepairService.instance.prepareForRepair(parentSession, 
FBUtilities.getBroadcastAddress(), allNeighbors, options, columnFamilyStores);
 +            repairedAt = 
ActiveRepairService.instance.getParentRepairSession(parentSession).getRepairedAt();
 +            progress.incrementAndGet();
 +        }
 +        catch (Throwable t)
 +        {
 +            SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +            fireErrorAndComplete(tag, progress.get(), totalProgress, 
t.getMessage());
 +            return;
 +        }
 +
 +        // Set up RepairJob executor for this repair command.
 +        final ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(new 
JMXConfigurableThreadPoolExecutor(options.getJobThreads(),
 +                                                                              
                                           Integer.MAX_VALUE,
 +                                                                              
                                           TimeUnit.SECONDS,
 +                                                                              
                                           new LinkedBlockingQueue<Runnable>(),
 +                                                                              
                                           new NamedThreadFactory("Repair#" + 
cmd),
 +                                                                              
                                           "internal"));
 +
 +        List<ListenableFuture<RepairSessionResult>> futures = new 
ArrayList<>(options.getRanges().size());
 +        for (Range<Token> range : options.getRanges())
 +        {
 +            final RepairSession session = 
ActiveRepairService.instance.submitRepairSession(parentSession,
 +                                                              range,
 +                                                              keyspace,
 +                                                              
options.getParallelism(),
 +                                                              
rangeToNeighbors.get(range),
 +                                                              repairedAt,
 +                                                              executor,
 +                                                              cfnames);
 +            if (session == null)
 +                continue;
 +            // After repair session completes, notify client its result
 +            Futures.addCallback(session, new 
FutureCallback<RepairSessionResult>()
 +            {
 +                public void onSuccess(RepairSessionResult result)
 +                {
 +                    /**
 +                     * If the success message below is modified, it must also 
be updated on
 +                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 +                     * for backward-compatibility support.
 +                     */
 +                    String message = String.format("Repair session %s for 
range %s finished", session.getId(),
 +                                                   
session.getRange().toString());
 +                    logger.info(message);
 +                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             
progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +
 +                public void onFailure(Throwable t)
 +                {
 +                    /**
 +                     * If the failure message below is modified, it must also 
be updated on
 +                     * {@link 
org.apache.cassandra.utils.progress.jmx.LegacyJMXProgressSupport}
 +                     * for backward-compatibility support.
 +                     */
 +                    String message = String.format("Repair session %s for 
range %s failed with error %s",
 +                                                   session.getId(), 
session.getRange().toString(), t.getMessage());
 +                    logger.error(message, t);
 +                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.PROGRESS,
 +                                                             
progress.incrementAndGet(),
 +                                                             totalProgress,
 +                                                             message));
 +                }
 +            });
 +            futures.add(session);
 +        }
 +
 +        // After all repair sessions completes(successful or not),
 +        // run anticompaction if necessary and send finish notice back to 
client
 +        final Collection<Range<Token>> successfulRanges = new ArrayList<>();
 +        final AtomicBoolean hasFailure = new AtomicBoolean();
 +        final ListenableFuture<List<RepairSessionResult>> allSessions = 
Futures.successfulAsList(futures);
 +        ListenableFuture anticompactionResult = 
Futures.transform(allSessions, new AsyncFunction<List<RepairSessionResult>, 
Object>()
 +        {
 +            @SuppressWarnings("unchecked")
 +            public ListenableFuture apply(List<RepairSessionResult> results) 
throws Exception
 +            {
 +                // filter out null(=failed) results and get successful ranges
 +                for (RepairSessionResult sessionResult : results)
 +                {
 +                    if (sessionResult != null)
 +                    {
 +                        successfulRanges.add(sessionResult.range);
 +                    }
 +                    else
 +                    {
 +                        hasFailure.compareAndSet(false, true);
 +                    }
 +                }
 +                return 
ActiveRepairService.instance.finishParentSession(parentSession, allNeighbors, 
successfulRanges);
 +            }
 +        });
 +        Futures.addCallback(anticompactionResult, new FutureCallback<Object>()
 +        {
 +            public void onSuccess(Object result)
 +            {
 +                
SystemDistributedKeyspace.successfulParentRepair(parentSession, 
successfulRanges);
 +                if (hasFailure.get())
 +                {
 +                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress,
 +                                                             "Some repair 
failed"));
 +                }
 +                else
 +                {
 +                    fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.SUCCESS, progress.get(), totalProgress,
 +                                                             "Repair 
completed successfully"));
 +                }
 +                repairComplete();
 +            }
 +
 +            public void onFailure(Throwable t)
 +            {
 +                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.ERROR, progress.get(), totalProgress, 
t.getMessage()));
 +                SystemDistributedKeyspace.failParentRepair(parentSession, t);
 +                repairComplete();
 +            }
 +
 +            private void repairComplete()
 +            {
 +                String duration = 
DurationFormatUtils.formatDurationWords(System.currentTimeMillis() - startTime,
 +                                                                          
true, true);
 +                String message = String.format("Repair command #%d finished 
in %s", cmd, duration);
 +                fireProgressEvent(tag, new 
ProgressEvent(ProgressEventType.COMPLETE, progress.get(), totalProgress, 
message));
 +                logger.info(message);
 +                if (options.isTraced() && traceState != null)
 +                {
 +                    for (ProgressListener listener : listeners)
 +                        traceState.removeProgressListener(listener);
 +                    // Because DebuggableThreadPoolExecutor#afterExecute and 
this callback
 +                    // run in a nondeterministic order (within the same 
thread), the
 +                    // TraceState may have been nulled out at this point. The 
TraceState
 +                    // should be traceState, so just set it without bothering 
to check if it
 +                    // actually was nulled out.
 +                    Tracing.instance.set(traceState);
 +                    Tracing.traceRepair(message);
 +                    Tracing.instance.stopSession();
 +                }
 +                executor.shutdownNow();
 +            }
 +        });
 +    }
 +
 +    private Thread createQueryThread(final int cmd, final UUID sessionId)
 +    {
 +        return new Thread(new WrappedRunnable()
 +        {
 +            // Query events within a time interval that overlaps the last by 
one second. Ignore duplicates. Ignore local traces.
 +            // Wake up upon local trace activity. Query when notified of 
trace activity with a timeout that doubles every two timeouts.
 +            public void runMayThrow() throws Exception
 +            {
 +                TraceState state = Tracing.instance.get(sessionId);
 +                if (state == null)
 +                    throw new Exception("no tracestate");
 +
 +                String format = "select event_id, source, activity from %s.%s 
where session_id = ? and event_id > ? and event_id < ?;";
 +                String query = String.format(format, TraceKeyspace.NAME, 
TraceKeyspace.EVENTS);
 +                SelectStatement statement = (SelectStatement) 
QueryProcessor.parseStatement(query).prepare().statement;
 +
 +                ByteBuffer sessionIdBytes = ByteBufferUtil.bytes(sessionId);
 +                InetAddress source = FBUtilities.getBroadcastAddress();
 +
 +                HashSet<UUID>[] seen = new HashSet[] { new HashSet<>(), new 
HashSet<>() };
 +                int si = 0;
 +                UUID uuid;
 +
 +                long tlast = System.currentTimeMillis(), tcur;
 +
 +                TraceState.Status status;
 +                long minWaitMillis = 125;
 +                long maxWaitMillis = 1000 * 1024L;
 +                long timeout = minWaitMillis;
 +                boolean shouldDouble = false;
 +
 +                while ((status = state.waitActivity(timeout)) != 
TraceState.Status.STOPPED)
 +                {
 +                    if (status == TraceState.Status.IDLE)
 +                    {
 +                        timeout = shouldDouble ? Math.min(timeout * 2, 
maxWaitMillis) : timeout;
 +                        shouldDouble = !shouldDouble;
 +                    }
 +                    else
 +                    {
 +                        timeout = minWaitMillis;
 +                        shouldDouble = false;
 +                    }
 +                    ByteBuffer tminBytes = 
ByteBufferUtil.bytes(UUIDGen.minTimeUUID(tlast - 1000));
 +                    ByteBuffer tmaxBytes = 
ByteBufferUtil.bytes(UUIDGen.maxTimeUUID(tcur = System.currentTimeMillis()));
 +                    QueryOptions options = 
QueryOptions.forInternalCalls(ConsistencyLevel.ONE, 
Lists.newArrayList(sessionIdBytes,
 +                                                                              
                                    tminBytes,
 +                                                                              
                                    tmaxBytes));
 +                    ResultMessage.Rows rows = 
statement.execute(QueryState.forInternalCalls(), options);
 +                    UntypedResultSet result = 
UntypedResultSet.create(rows.result);
 +
 +                    for (UntypedResultSet.Row r : result)
 +                    {
 +                        if (source.equals(r.getInetAddress("source")))
 +                            continue;
 +                        if ((uuid = r.getUUID("event_id")).timestamp() > 
(tcur - 1000) * 10000)
 +                            seen[si].add(uuid);
 +                        if (seen[si == 0 ? 1 : 0].contains(uuid))
 +                            continue;
 +                        String message = String.format("%s: %s", 
r.getInetAddress("source"), r.getString("activity"));
 +                        fireProgressEvent("repair:" + cmd,
 +                                          new 
ProgressEvent(ProgressEventType.NOTIFICATION, 0, 0, message));
 +                    }
 +                    tlast = tcur;
 +
 +                    si = si == 0 ? 1 : 0;
 +                    seen[si].clear();
 +                }
 +            }
 +        });
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/cassandra/blob/c2566d1c/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --cc test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 7793660,26e5126..f34d0e2
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@@ -50,180 -43,11 +50,182 @@@ import org.apache.cassandra.utils.concu
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertFalse;
  
 -public class ActiveRepairServiceTest extends SchemaLoader
 +public class ActiveRepairServiceTest
  {
 +    public static final String KEYSPACE5 = "Keyspace5";
 +    public static final String CF_STANDARD1 = "Standard1";
 +    public static final String CF_COUNTER = "Counter1";
  
 -    private static final String KEYSPACE1 = "Keyspace1";
 -    private static final String CF = "Standard1";
 +    public String cfname;
 +    public ColumnFamilyStore store;
 +    public InetAddress LOCAL, REMOTE;
 +
 +    private boolean initialized;
 +
 +    @BeforeClass
 +    public static void defineSchema() throws ConfigurationException
 +    {
 +        SchemaLoader.prepareServer();
 +        SchemaLoader.createKeyspace(KEYSPACE5,
 +                                    SimpleStrategy.class,
 +                                    KSMetaData.optsWithRF(2),
 +                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_COUNTER),
 +                                    SchemaLoader.standardCFMD(KEYSPACE5, 
CF_STANDARD1));
 +    }
 +
 +    @Before
 +    public void prepare() throws Exception
 +    {
 +        if (!initialized)
 +        {
 +            SchemaLoader.startGossiper();
 +            initialized = true;
 +
 +            LOCAL = FBUtilities.getBroadcastAddress();
 +            // generate a fake endpoint for which we can spoof 
receiving/sending trees
 +            REMOTE = InetAddress.getByName("127.0.0.2");
 +        }
 +
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        tmd.clearUnsafe();
 +        
StorageService.instance.setTokens(Collections.singleton(StorageService.getPartitioner().getRandomToken()));
 +        
tmd.updateNormalToken(StorageService.getPartitioner().getMinimumToken(), 
REMOTE);
 +        assert tmd.isMember(REMOTE);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOne() throws Throwable
 +    {
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
-             neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, null, null));
++            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwo() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
-             neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, null, null));
++            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, null, null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsPlusOneInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf+1 nodes, and ensure that all nodes are returned
 +        Set<InetAddress> expected = addTokens(1 + 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
-             neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInLocalDC() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the ARS are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        Set<InetAddress> expected = new HashSet<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        // remove remote endpoints
 +        TokenMetadata.Topology topology = 
tmd.cloneOnlyTokenMap().getTopology();
 +        HashSet<InetAddress> localEndpoints = 
Sets.newHashSet(topology.getDatacenterEndpoints().get(DatabaseDescriptor.getLocalDataCenter()));
 +        expected = Sets.intersection(expected, localEndpoints);
 +
 +        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +        Set<InetAddress> neighbors = new HashSet<>();
 +        for (Range<Token> range : ranges)
 +        {
-             neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
++            neighbors.addAll(ActiveRepairService.getNeighbors(KEYSPACE5, 
ranges, range, Arrays.asList(DatabaseDescriptor.getLocalDataCenter()), null));
 +        }
 +        assertEquals(expected, neighbors);
 +    }
 +
 +    @Test
 +    public void testGetNeighborsTimesTwoInSpecifiedHosts() throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +
 +        // generate rf*2 nodes, and ensure that only neighbors specified by 
the hosts are returned
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        AbstractReplicationStrategy ars = 
Keyspace.open(KEYSPACE5).getReplicationStrategy();
 +        List<InetAddress> expected = new ArrayList<>();
 +        for (Range<Token> replicaRange : 
ars.getAddressRanges().get(FBUtilities.getBroadcastAddress()))
 +        {
 +            
expected.addAll(ars.getRangeAddresses(tmd.cloneOnlyTokenMap()).get(replicaRange));
 +        }
 +
 +        expected.remove(FBUtilities.getBroadcastAddress());
 +        Collection<String> hosts = 
Arrays.asList(FBUtilities.getBroadcastAddress().getCanonicalHostName(),expected.get(0).getCanonicalHostName());
++        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
 +
-        assertEquals(expected.get(0), 
ActiveRepairService.getNeighbors(KEYSPACE5,
-                                                                       
StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(),
-                                                                       null, 
hosts).iterator().next());
++        assertEquals(expected.get(0), 
ActiveRepairService.getNeighbors(KEYSPACE5, ranges,
++                                                                       
ranges.iterator().next(),
++                                                                       null, 
hosts).iterator().next());
 +    }
 +
 +    @Test(expected = IllegalArgumentException.class)
 +    public void testGetNeighborsSpecifiedHostsWithNoLocalHost() throws 
Throwable
 +    {
 +        addTokens(2 * 
Keyspace.open(KEYSPACE5).getReplicationStrategy().getReplicationFactor());
 +        //Dont give local endpoint
 +        Collection<String> hosts = Arrays.asList("127.0.0.3");
-         ActiveRepairService.getNeighbors(KEYSPACE5, 
StorageService.instance.getLocalRanges(KEYSPACE5).iterator().next(), null, 
hosts);
++        Collection<Range<Token>> ranges = 
StorageService.instance.getLocalRanges(KEYSPACE5);
++        ActiveRepairService.getNeighbors(KEYSPACE5, ranges, 
ranges.iterator().next(), null, hosts);
 +    }
 +
 +    Set<InetAddress> addTokens(int max) throws Throwable
 +    {
 +        TokenMetadata tmd = StorageService.instance.getTokenMetadata();
 +        Set<InetAddress> endpoints = new HashSet<>();
 +        for (int i = 1; i <= max; i++)
 +        {
 +            InetAddress endpoint = InetAddress.getByName("127.0.0." + i);
 +            
tmd.updateNormalToken(StorageService.getPartitioner().getRandomToken(), 
endpoint);
 +            endpoints.add(endpoint);
 +        }
 +        return endpoints;
 +    }
  
      @Test
      public void testGetActiveRepairedSSTableRefs()

Reply via email to