http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java index 447d504..374a760 100644 --- a/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java +++ b/test/unit/org/apache/cassandra/db/repair/PendingAntiCompactionTest.java @@ -18,6 +18,7 @@ package org.apache.cassandra.db.repair; +import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -42,6 +43,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.compaction.CompactionManager; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Schema; @@ -64,6 +67,9 @@ public class PendingAntiCompactionTest { private static final Logger logger = LoggerFactory.getLogger(PendingAntiCompactionTest.class); private static final Collection<Range<Token>> FULL_RANGE; + private static final Collection<Range<Token>> NO_RANGES = Collections.emptyList(); + private static InetAddressAndPort local; + static { DatabaseDescriptor.daemonInitialization(); @@ -77,9 +83,10 @@ public class PendingAntiCompactionTest private ColumnFamilyStore cfs; @BeforeClass - public static void setupClass() + public static void setupClass() throws Throwable { SchemaLoader.prepareServer(); + local = InetAddressAndPort.getByName("127.0.0.1"); } @Before @@ -89,6 +96,7 @@ public class PendingAntiCompactionTest cfm = CreateTableStatement.parse(String.format("CREATE TABLE %s.%s (k INT PRIMARY KEY, v INT)", ks, tbl), ks).build(); SchemaLoader.createKeyspace(ks, KeyspaceParams.simple(1), cfm); cfs = Schema.instance.getColumnFamilyStoreInstance(cfm.id); + } private void makeSSTables(int num) @@ -105,7 +113,7 @@ public class PendingAntiCompactionTest private static class InstrumentedAcquisitionCallback extends PendingAntiCompaction.AcquisitionCallback { - public InstrumentedAcquisitionCallback(UUID parentRepairSession, Collection<Range<Token>> ranges) + public InstrumentedAcquisitionCallback(UUID parentRepairSession, RangesAtEndpoint ranges) { super(parentRepairSession, ranges); } @@ -155,7 +163,7 @@ public class PendingAntiCompactionTest ExecutorService executor = Executors.newSingleThreadExecutor(); try { - pac = new PendingAntiCompaction(sessionID, tables, ranges, executor); + pac = new PendingAntiCompaction(sessionID, tables, atEndpoint(ranges, NO_RANGES), executor); pac.run().get(); } finally @@ -217,7 +225,7 @@ public class PendingAntiCompactionTest Assert.assertTrue(repaired.intersects(FULL_RANGE)); Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 1, null); + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 1, null, false); repaired.reloadSSTableMetadata(); PendingAntiCompaction.AcquisitionCallable acquisitionCallable = new PendingAntiCompaction.AcquisitionCallable(cfs, FULL_RANGE, UUIDGen.getTimeUUID()); @@ -243,7 +251,7 @@ public class PendingAntiCompactionTest Assert.assertTrue(repaired.intersects(FULL_RANGE)); Assert.assertTrue(unrepaired.intersects(FULL_RANGE)); - repaired.descriptor.getMetadataSerializer().mutateRepaired(repaired.descriptor, 0, UUIDGen.getTimeUUID()); + repaired.descriptor.getMetadataSerializer().mutateRepairMetadata(repaired.descriptor, 0, UUIDGen.getTimeUUID(), false); repaired.reloadSSTableMetadata(); Assert.assertTrue(repaired.isPendingRepair()); @@ -284,7 +292,7 @@ public class PendingAntiCompactionTest PendingAntiCompaction.AcquireResult result = acquisitionCallable.call(); Assert.assertNotNull(result); - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES)); Assert.assertTrue(cb.submittedCompactions.isEmpty()); cb.apply(Lists.newArrayList(result)); @@ -308,7 +316,7 @@ public class PendingAntiCompactionTest Assert.assertNotNull(result); Assert.assertEquals(Transactional.AbstractTransactional.State.IN_PROGRESS, result.txn.state()); - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, Collections.emptyList())); Assert.assertTrue(cb.submittedCompactions.isEmpty()); cb.apply(Lists.newArrayList(result, null)); @@ -333,7 +341,7 @@ public class PendingAntiCompactionTest ColumnFamilyStore cfs2 = Schema.instance.getColumnFamilyStoreInstance(Schema.instance.getTableMetadata("system", "peers").id); PendingAntiCompaction.AcquireResult fakeResult = new PendingAntiCompaction.AcquireResult(cfs2, null, null); - InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), FULL_RANGE); + InstrumentedAcquisitionCallback cb = new InstrumentedAcquisitionCallback(UUIDGen.getTimeUUID(), atEndpoint(FULL_RANGE, NO_RANGES)); Assert.assertTrue(cb.submittedCompactions.isEmpty()); cb.apply(Lists.newArrayList(result, fakeResult)); @@ -359,8 +367,19 @@ public class PendingAntiCompactionTest true,0, true, PreviewKind.NONE); - CompactionManager.instance.performAnticompaction(result.cfs, FULL_RANGE, result.refs, result.txn, - ActiveRepairService.UNREPAIRED_SSTABLE, sessionID, sessionID); + CompactionManager.instance.performAnticompaction(result.cfs, atEndpoint(FULL_RANGE, NO_RANGES), result.refs, result.txn, sessionID); + + } + + private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans) + { + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local); + for (Range<Token> range : full) + builder.add(new Replica(local, range, true)); + + for (Range<Token> range : trans) + builder.add(new Replica(local, range, false)); + return builder.build(); } }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java index 8256ac6..5e44346 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraOutgoingFileTest.java @@ -114,6 +114,7 @@ public class CassandraOutgoingFileTest List<Range<Token>> requestedRanges = Arrays.asList(new Range<>(store.getPartitioner().getMinimumToken(), getTokenAtIndex(4)), new Range<>(getTokenAtIndex(2), getTokenAtIndex(6)), new Range<>(getTokenAtIndex(5), sstable.last.getToken())); + requestedRanges = Range.normalize(requestedRanges); CassandraOutgoingFile cof = new CassandraOutgoingFile(StreamOperation.BOOTSTRAP, sstable.ref(), sstable.getPositionsForRanges(requestedRanges), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java index 86018af..b597bfe 100644 --- a/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java +++ b/test/unit/org/apache/cassandra/db/streaming/CassandraStreamManagerTest.java @@ -33,6 +33,8 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -118,10 +120,10 @@ public class CassandraStreamManagerTest return Iterables.getOnlyElement(diff); } - private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair) throws IOException + private static void mutateRepaired(SSTableReader sstable, long repairedAt, UUID pendingRepair, boolean isTransient) throws IOException { Descriptor descriptor = sstable.descriptor; - descriptor.getMetadataSerializer().mutateRepaired(descriptor, repairedAt, pendingRepair); + descriptor.getMetadataSerializer().mutateRepairMetadata(descriptor, repairedAt, pendingRepair, isTransient); sstable.reloadSSTableMetadata(); } @@ -141,7 +143,7 @@ public class CassandraStreamManagerTest private Set<SSTableReader> getReadersForRange(Range<Token> range) { Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(NO_PENDING_REPAIR), - Collections.singleton(range), + RangesAtEndpoint.toDummyList(Collections.singleton(range)), NO_PENDING_REPAIR, PreviewKind.NONE); return sstablesFromStreams(streams); @@ -151,7 +153,7 @@ public class CassandraStreamManagerTest { IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); Collection<Range<Token>> ranges = Lists.newArrayList(new Range<Token>(partitioner.getMinimumToken(), partitioner.getMinimumToken())); - Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), ranges, pendingRepair, PreviewKind.NONE); + Collection<OutgoingStream> streams = cfs.getStreamManager().createOutgoingStreams(session(pendingRepair), RangesAtEndpoint.toDummyList(ranges), pendingRepair, PreviewKind.NONE); return sstablesFromStreams(streams); } @@ -167,9 +169,9 @@ public class CassandraStreamManagerTest UUID pendingRepair = UUIDGen.getTimeUUID(); long repairedAt = System.currentTimeMillis(); - mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair); - mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID()); - mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR); + mutateRepaired(sstable2, ActiveRepairService.UNREPAIRED_SSTABLE, pendingRepair, false); + mutateRepaired(sstable3, ActiveRepairService.UNREPAIRED_SSTABLE, UUIDGen.getTimeUUID(), false); + mutateRepaired(sstable4, repairedAt, NO_PENDING_REPAIR, false); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java new file mode 100644 index 0000000..2a6cb65 --- /dev/null +++ b/test/unit/org/apache/cassandra/db/streaming/StreamRequestTest.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.streaming; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.streaming.StreamRequest; + +public class StreamRequestTest +{ + private static InetAddressAndPort local; + private final String ks = "keyspace"; + private final int version = MessagingService.current_version; + + @BeforeClass + public static void setUp() throws Throwable + { + DatabaseDescriptor.daemonInitialization(); + local = InetAddressAndPort.getByName("127.0.0.1"); + } + + @Test + public void serializationRoundTrip() throws Throwable + { + StreamRequest orig = new StreamRequest(ks, + atEndpoint(Arrays.asList(range(1, 2), range(3, 4), range(5, 6)), + Collections.emptyList()), + atEndpoint(Collections.emptyList(), + Arrays.asList(range(5, 6), range(7, 8))), + Arrays.asList("a", "b", "c")); + + int expectedSize = (int) StreamRequest.serializer.serializedSize(orig, version); + try (DataOutputBuffer out = new DataOutputBuffer(expectedSize)) + { + StreamRequest.serializer.serialize(orig, out, version); + Assert.assertEquals(expectedSize, out.buffer().limit()); + try (DataInputBuffer in = new DataInputBuffer(out.buffer(), false)) + { + StreamRequest decoded = StreamRequest.serializer.deserialize(in, version); + + Assert.assertEquals(orig.keyspace, decoded.keyspace); + Assert.assertEquals(orig.full, decoded.full); + Assert.assertEquals(orig.transientReplicas, decoded.transientReplicas); + Assert.assertEquals(orig.columnFamilies, decoded.columnFamilies); + } + } + } + + private static RangesAtEndpoint atEndpoint(Collection<Range<Token>> full, Collection<Range<Token>> trans) + { + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(local); + for (Range<Token> range : full) + builder.add(new Replica(local, range, true)); + + for (Range<Token> range : trans) + builder.add(new Replica(local, range, false)); + + return builder.build(); + } + + private static Range<Token> range(int l, int r) + { + return new Range<>(new ByteOrderedPartitioner.BytesToken(Integer.toString(l).getBytes()), + new ByteOrderedPartitioner.BytesToken(Integer.toString(r).getBytes())); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java index ec0f6b1..7eebef7 100644 --- a/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java +++ b/test/unit/org/apache/cassandra/db/view/ViewUtilsTest.java @@ -28,6 +28,7 @@ import org.junit.Test; import org.junit.Assert; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken; @@ -76,12 +77,12 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("CA"), - new StringToken("BB")); + Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("CA"), + new StringToken("BB")); Assert.assertTrue(naturalEndpoint.isPresent()); - Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get()); + Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.2"), naturalEndpoint.get().endpoint()); } @@ -109,12 +110,12 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("CA"), - new StringToken("BB")); + Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("CA"), + new StringToken("BB")); Assert.assertTrue(naturalEndpoint.isPresent()); - Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get()); + Assert.assertEquals(InetAddressAndPort.getByName("127.0.0.1"), naturalEndpoint.get().endpoint()); } @Test @@ -141,9 +142,9 @@ public class ViewUtilsTest KeyspaceMetadata meta = KeyspaceMetadata.create("Keyspace1", KeyspaceParams.create(false, replicationMap)); Schema.instance.load(meta); - Optional<InetAddressAndPort> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", - new StringToken("AB"), - new StringToken("BB")); + Optional<Replica> naturalEndpoint = ViewUtils.getViewNaturalEndpoint("Keyspace1", + new StringToken("AB"), + new StringToken("BB")); Assert.assertFalse(naturalEndpoint.isPresent()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/BootStrapperTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java index f11cb62..8ae6853 100644 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@ -18,18 +18,20 @@ package org.apache.cassandra.dht; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; import java.net.UnknownHostException; import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Map; -import java.util.Set; import java.util.UUID; +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import org.apache.cassandra.dht.RangeStreamer.FetchReplica; import org.apache.commons.math3.stat.descriptive.SummaryStatistics; import org.junit.AfterClass; @@ -41,6 +43,7 @@ import org.apache.cassandra.OrderedJUnit4ClassRunner; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.dht.tokenallocator.TokenAllocation; @@ -60,6 +63,7 @@ public class BootStrapperTest { static IPartitioner oldPartitioner; + static Predicate<Replica> originalAlivePredicate = RangeStreamer.ALIVE_PREDICATE; @BeforeClass public static void setup() throws ConfigurationException { @@ -68,12 +72,14 @@ public class BootStrapperTest SchemaLoader.startGossiper(); SchemaLoader.prepareServer(); SchemaLoader.schemaDefinition("BootStrapperTest"); + RangeStreamer.ALIVE_PREDICATE = Predicates.alwaysTrue(); } @AfterClass public static void tearDown() { DatabaseDescriptor.setPartitionerUnsafe(oldPartitioner); + RangeStreamer.ALIVE_PREDICATE = originalAlivePredicate; } @Test @@ -82,7 +88,7 @@ public class BootStrapperTest final int[] clusterSizes = new int[] { 1, 3, 5, 10, 100}; for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces()) { - int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor(); + int replicationFactor = Keyspace.open(keyspaceName).getReplicationStrategy().getReplicationFactor().allReplicas; for (int clusterSize : clusterSizes) if (clusterSize >= replicationFactor) testSourceTargetComputation(keyspaceName, clusterSize, replicationFactor); @@ -115,21 +121,25 @@ public class BootStrapperTest public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } }; s.addSourceFilter(new RangeStreamer.FailureDetectorSourceFilter(mockFailureDetector)); + assertNotNull(Keyspace.open(keyspaceName)); s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); - Collection<Map.Entry<InetAddressAndPort, Collection<Range<Token>>>> toFetch = s.toFetch().get(keyspaceName); - // Check we get get RF new ranges in total - Set<Range<Token>> ranges = new HashSet<>(); - for (Map.Entry<InetAddressAndPort, Collection<Range<Token>>> e : toFetch) - ranges.addAll(e.getValue()); + Collection<Multimap<InetAddressAndPort, FetchReplica>> toFetch = s.toFetch().get(keyspaceName); - assertEquals(replicationFactor, ranges.size()); + // Check we get get RF new ranges in total + long rangesCount = toFetch.stream() + .map(Multimap::values) + .flatMap(Collection::stream) + .map(f -> f.remote) + .map(Replica::range) + .count(); + assertEquals(replicationFactor, rangesCount); // there isn't any point in testing the size of these collections for any specific size. When a random partitioner // is used, they will vary. - assert toFetch.iterator().next().getValue().size() > 0; - assert !toFetch.iterator().next().getKey().equals(myEndpoint); + assert toFetch.stream().map(Multimap::values).flatMap(Collection::stream).count() > 0; + assert toFetch.stream().map(Multimap::keySet).map(Collection::stream).noneMatch(myEndpoint::equals); return s; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java index 78e87c1..07d6377 100644 --- a/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeFetchMapCalculatorTest.java @@ -25,8 +25,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; -import com.google.common.collect.HashMultimap; +import com.google.common.base.Predicate; import com.google.common.collect.Multimap; +import org.apache.cassandra.locator.EndpointsByRange; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -34,6 +35,8 @@ import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.locator.AbstractNetworkTopologySnitch; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaUtils; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -78,14 +81,14 @@ public class RangeFetchMapCalculatorTest @Test public void testWithSingleSource() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.4"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.5"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -95,14 +98,14 @@ public class RangeFetchMapCalculatorTest @Test public void testWithNonOverlappingSource() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.4"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.5", "127.0.0.6"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.7", "127.0.0.8"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.9", "127.0.0.10"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -112,12 +115,12 @@ public class RangeFetchMapCalculatorTest @Test public void testWithRFThreeReplacement() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2", "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3", "127.0.0.4"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -128,14 +131,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForMultipleRoundsComputation() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.3"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.3", "127.0.0.2"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -150,14 +153,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForMultipleRoundsComputationWithLocalHost() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1", "127.0.0.2"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -170,14 +173,14 @@ public class RangeFetchMapCalculatorTest @Test public void testForEmptyGraph() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 31, 40, "127.0.0.1"); addNonTrivialRangeAndSources(rangesWithSources, 41, 50, "127.0.0.1"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<RangeStreamer.ISourceFilter>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); //All ranges map to local host so we will not stream anything. assertTrue(map.isEmpty()); @@ -186,31 +189,28 @@ public class RangeFetchMapCalculatorTest @Test public void testWithNoSourceWithLocal() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.5"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); //Return false for all except 127.0.0.5 - final RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter() + final Predicate<Replica> filter = replica -> { - public boolean shouldInclude(InetAddressAndPort endpoint) + try { - try - { - if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.5"))) - return false; - else - return true; - } - catch (UnknownHostException e) - { + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.5"))) + return false; + else return true; - } + } + catch (UnknownHostException e) + { + return true; } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(filter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(filter), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); @@ -225,32 +225,26 @@ public class RangeFetchMapCalculatorTest @Test (expected = IllegalStateException.class) public void testWithNoLiveSource() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.5"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.2"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.3"); - final RangeStreamer.ISourceFilter allDeadFilter = new RangeStreamer.ISourceFilter() - { - public boolean shouldInclude(InetAddressAndPort endpoint) - { - return false; - } - }; + final Predicate<Replica> allDeadFilter = replica -> false; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(allDeadFilter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(allDeadFilter), "Test"); calculator.getRangeFetchMap(); } @Test public void testForLocalDC() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.1", "127.0.0.3", "127.0.0.53"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.1", "127.0.0.3", "127.0.0.57"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59", "127.0.0.61"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, new ArrayList<>(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), new ArrayList<>(), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); Assert.assertEquals(2, map.asMap().size()); @@ -263,31 +257,28 @@ public class RangeFetchMapCalculatorTest @Test public void testForRemoteDC() throws Exception { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59"); //Reject only 127.0.0.3 and accept everyone else - final RangeStreamer.ISourceFilter localHostFilter = new RangeStreamer.ISourceFilter() + final Predicate<Replica> localHostFilter = replica -> { - public boolean shouldInclude(InetAddressAndPort endpoint) + try { - try - { - if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3"))) - return false; - else - return true; - } - catch (UnknownHostException e) - { + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) + return false; + else return true; - } + } + catch (UnknownHostException e) + { + return true; } }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Arrays.asList(localHostFilter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Arrays.asList(localHostFilter), "Test"); Multimap<InetAddressAndPort, Range<Token>> map = calculator.getRangeFetchMap(); validateRange(rangesWithSources, map); Assert.assertEquals(3, map.asMap().size()); @@ -301,14 +292,14 @@ public class RangeFetchMapCalculatorTest @Test public void testTrivialRanges() throws UnknownHostException { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); // add non-trivial ranges addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); addNonTrivialRangeAndSources(rangesWithSources, 21, 30, "127.0.0.2", "127.0.0.59"); // and a trivial one: addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.emptyList(), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.emptyList(), "Test"); Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges(); Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap); assertTrue(trivialMap.get(InetAddressAndPort.getByName("127.0.0.3")).contains(generateTrivialRange(1,10)) ^ @@ -319,7 +310,7 @@ public class RangeFetchMapCalculatorTest @Test(expected = IllegalStateException.class) public void testNotEnoughEndpointsForTrivialRange() throws UnknownHostException { - Multimap<Range<Token>, InetAddressAndPort> rangesWithSources = HashMultimap.create(); + EndpointsByRange.Mutable rangesWithSources = new EndpointsByRange.Mutable(); // add non-trivial ranges addNonTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3", "127.0.0.51"); addNonTrivialRangeAndSources(rangesWithSources, 11, 20, "127.0.0.3", "127.0.0.55"); @@ -327,23 +318,20 @@ public class RangeFetchMapCalculatorTest // and a trivial one: addTrivialRangeAndSources(rangesWithSources, 1, 10, "127.0.0.3"); - RangeStreamer.ISourceFilter filter = new RangeStreamer.ISourceFilter() + Predicate<Replica> filter = replica -> { - public boolean shouldInclude(InetAddressAndPort endpoint) + try { - try - { - if (endpoint.equals(InetAddressAndPort.getByName("127.0.0.3"))) - return false; - } - catch (UnknownHostException e) - { - throw new RuntimeException(e); - } - return true; + if (replica.endpoint().equals(InetAddressAndPort.getByName("127.0.0.3"))) + return false; + } + catch (UnknownHostException e) + { + throw new RuntimeException(e); } + return true; }; - RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources, Collections.singleton(filter), "Test"); + RangeFetchMapCalculator calculator = new RangeFetchMapCalculator(rangesWithSources.asImmutableView(), Collections.singleton(filter), "Test"); Multimap<InetAddressAndPort, Range<Token>> optMap = calculator.getRangeFetchMapForNonTrivialRanges(); Multimap<InetAddressAndPort, Range<Token>> trivialMap = calculator.getRangeFetchMapForTrivialRanges(optMap); @@ -355,27 +343,29 @@ public class RangeFetchMapCalculatorTest assertTrue(result.containsAll(expected)); } - private void validateRange(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result) + private void validateRange(EndpointsByRange.Mutable rangesWithSources, Multimap<InetAddressAndPort, Range<Token>> result) { for (Map.Entry<InetAddressAndPort, Range<Token>> entry : result.entries()) { - assertTrue(rangesWithSources.get(entry.getValue()).contains(entry.getKey())); + assertTrue(rangesWithSources.get(entry.getValue()).endpoints().contains(entry.getKey())); } } - private void addNonTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException + private void addNonTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException { for (InetAddressAndPort endpoint : makeAddrs(hosts)) { - rangesWithSources.put(generateNonTrivialRange(left, right), endpoint); + Range<Token> range = generateNonTrivialRange(left, right); + rangesWithSources.put(range, Replica.fullReplica(endpoint, range)); } } - private void addTrivialRangeAndSources(Multimap<Range<Token>, InetAddressAndPort> rangesWithSources, int left, int right, String... hosts) throws UnknownHostException + private void addTrivialRangeAndSources(EndpointsByRange.Mutable rangesWithSources, int left, int right, String... hosts) throws UnknownHostException { for (InetAddressAndPort endpoint : makeAddrs(hosts)) { - rangesWithSources.put(generateTrivialRange(left, right), endpoint); + Range<Token> range = generateTrivialRange(left, right); + rangesWithSources.put(range, Replica.fullReplica(endpoint, range)); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/RangeTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/RangeTest.java b/test/unit/org/apache/cassandra/dht/RangeTest.java index 495979e..36a8da1 100644 --- a/test/unit/org/apache/cassandra/dht/RangeTest.java +++ b/test/unit/org/apache/cassandra/dht/RangeTest.java @@ -642,7 +642,7 @@ public class RangeTest Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); for (Token t : tokensToTest) { - if (checker.contains(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration + if (checker.test(t) != Range.isInRanges(t, ranges)) // avoid running Joiner.on(..) every iteration fail(String.format("This should never flap! If it does, it is a bug (ranges = %s, token = %s)", Joiner.on(",").join(ranges), t)); } } @@ -653,11 +653,11 @@ public class RangeTest { List<Range<Token>> ranges = asList(r(Long.MIN_VALUE, Long.MIN_VALUE + 1), r(Long.MAX_VALUE - 1, Long.MAX_VALUE)); Range.OrderedRangeContainmentChecker checker = new Range.OrderedRangeContainmentChecker(ranges); - assertFalse(checker.contains(t(Long.MIN_VALUE))); - assertTrue(checker.contains(t(Long.MIN_VALUE + 1))); - assertFalse(checker.contains(t(0))); - assertFalse(checker.contains(t(Long.MAX_VALUE - 1))); - assertTrue(checker.contains(t(Long.MAX_VALUE))); + assertFalse(checker.test(t(Long.MIN_VALUE))); + assertTrue(checker.test(t(Long.MIN_VALUE + 1))); + assertFalse(checker.test(t(0))); + assertFalse(checker.test(t(Long.MAX_VALUE - 1))); + assertTrue(checker.test(t(Long.MAX_VALUE))); } private static Range<Token> r(long left, long right) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/SplitterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/SplitterTest.java b/test/unit/org/apache/cassandra/dht/SplitterTest.java index 322a57c..c591499 100644 --- a/test/unit/org/apache/cassandra/dht/SplitterTest.java +++ b/test/unit/org/apache/cassandra/dht/SplitterTest.java @@ -62,13 +62,54 @@ public class SplitterTest randomSplitTestVNodes(new Murmur3Partitioner()); } + @Test + public void testWithWeight() + { + List<Splitter.WeightedRange> ranges = new ArrayList<>(); + ranges.add(new Splitter.WeightedRange(1.0, t(0, 10))); + ranges.add(new Splitter.WeightedRange(1.0, t(20, 30))); + ranges.add(new Splitter.WeightedRange(0.5, t(40, 60))); + + List<Splitter.WeightedRange> ranges2 = new ArrayList<>(); + ranges2.add(new Splitter.WeightedRange(1.0, t(0, 10))); + ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30))); + ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50))); + IPartitioner partitioner = Murmur3Partitioner.instance; + Splitter splitter = partitioner.splitter().get(); + + assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false)); + } + + @Test + public void testWithWeight2() + { + List<Splitter.WeightedRange> ranges = new ArrayList<>(); + ranges.add(new Splitter.WeightedRange(0.2, t(0, 10))); + ranges.add(new Splitter.WeightedRange(1.0, t(20, 30))); + ranges.add(new Splitter.WeightedRange(1.0, t(40, 50))); + + List<Splitter.WeightedRange> ranges2 = new ArrayList<>(); + ranges2.add(new Splitter.WeightedRange(1.0, t(0, 2))); + ranges2.add(new Splitter.WeightedRange(1.0, t(20, 30))); + ranges2.add(new Splitter.WeightedRange(1.0, t(40, 50))); + IPartitioner partitioner = Murmur3Partitioner.instance; + Splitter splitter = partitioner.splitter().get(); + + assertEquals(splitter.splitOwnedRanges(2, ranges, false), splitter.splitOwnedRanges(2, ranges2, false)); + } + + private Range<Token> t(long left, long right) + { + return new Range<>(new Murmur3Partitioner.LongToken(left), new Murmur3Partitioner.LongToken(right)); + } + private static void randomSplitTestNoVNodes(IPartitioner partitioner) { Splitter splitter = getSplitter(partitioner); Random r = new Random(); for (int i = 0; i < 10000; i++) { - List<Range<Token>> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner); + List<Splitter.WeightedRange> localRanges = generateLocalRanges(1, r.nextInt(4) + 1, splitter, r, partitioner instanceof RandomPartitioner); List<Token> boundaries = splitter.splitOwnedRanges(r.nextInt(9) + 1, localRanges, false); assertTrue("boundaries = " + boundaries + " ranges = " + localRanges, assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, true)); } @@ -84,14 +125,14 @@ public class SplitterTest int numTokens = 172 + r.nextInt(128); int rf = r.nextInt(4) + 2; int parts = r.nextInt(5) + 1; - List<Range<Token>> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner); + List<Splitter.WeightedRange> localRanges = generateLocalRanges(numTokens, rf, splitter, r, partitioner instanceof RandomPartitioner); List<Token> boundaries = splitter.splitOwnedRanges(parts, localRanges, true); if (!assertRangeSizeEqual(localRanges, boundaries, partitioner, splitter, false)) fail(String.format("Could not split %d tokens with rf=%d into %d parts (localRanges=%s, boundaries=%s)", numTokens, rf, parts, localRanges, boundaries)); } } - private static boolean assertRangeSizeEqual(List<Range<Token>> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges) + private static boolean assertRangeSizeEqual(List<Splitter.WeightedRange> localRanges, List<Token> tokens, IPartitioner partitioner, Splitter splitter, boolean splitIndividualRanges) { Token start = partitioner.getMinimumToken(); List<BigInteger> splits = new ArrayList<>(); @@ -119,27 +160,27 @@ public class SplitterTest return allBalanced; } - private static BigInteger sumOwnedBetween(List<Range<Token>> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges) + private static BigInteger sumOwnedBetween(List<Splitter.WeightedRange> localRanges, Token start, Token end, Splitter splitter, boolean splitIndividualRanges) { BigInteger sum = BigInteger.ZERO; - for (Range<Token> range : localRanges) + for (Splitter.WeightedRange range : localRanges) { if (splitIndividualRanges) { - Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range); + Set<Range<Token>> intersections = new Range<>(start, end).intersectionWith(range.range()); for (Range<Token> intersection : intersections) sum = sum.add(splitter.valueForToken(intersection.right).subtract(splitter.valueForToken(intersection.left))); } else { - if (new Range<>(start, end).contains(range.left)) - sum = sum.add(splitter.valueForToken(range.right).subtract(splitter.valueForToken(range.left))); + if (new Range<>(start, end).contains(range.left())) + sum = sum.add(splitter.valueForToken(range.right()).subtract(splitter.valueForToken(range.left()))); } } return sum; } - private static List<Range<Token>> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner) + private static List<Splitter.WeightedRange> generateLocalRanges(int numTokens, int rf, Splitter splitter, Random r, boolean randomPartitioner) { int localTokens = numTokens * rf; List<Token> randomTokens = new ArrayList<>(); @@ -152,11 +193,11 @@ public class SplitterTest Collections.sort(randomTokens); - List<Range<Token>> localRanges = new ArrayList<>(localTokens); + List<Splitter.WeightedRange> localRanges = new ArrayList<>(localTokens); for (int i = 0; i < randomTokens.size() - 1; i++) { assert randomTokens.get(i).compareTo(randomTokens.get(i + 1)) < 0; - localRanges.add(new Range<>(randomTokens.get(i), randomTokens.get(i + 1))); + localRanges.add(new Splitter.WeightedRange(1.0, new Range<>(randomTokens.get(i), randomTokens.get(i + 1)))); i++; } return localRanges; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java index bf71c09..34096a7 100644 --- a/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java +++ b/test/unit/org/apache/cassandra/dht/StreamStateStoreTest.java @@ -19,6 +19,7 @@ package org.apache.cassandra.dht; import java.util.Collections; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.junit.BeforeClass; import org.junit.Test; @@ -53,7 +54,7 @@ public class StreamStateStoreTest InetAddressAndPort local = FBUtilities.getBroadcastAddressAndPort(); StreamSession session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); - session.addStreamRequest("keyspace1", Collections.singleton(range), Collections.singleton("cf")); + session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf")); StreamStateStore store = new StreamStateStore(); // session complete event that is not completed makes data not available for keyspace/ranges @@ -74,7 +75,7 @@ public class StreamStateStoreTest // add different range within the same keyspace Range<Token> range2 = new Range<>(factory.fromString("100"), factory.fromString("200")); session = new StreamSession(StreamOperation.BOOTSTRAP, local, new DefaultConnectionFactory(), 0, null, PreviewKind.NONE); - session.addStreamRequest("keyspace1", Collections.singleton(range2), Collections.singleton("cf")); + session.addStreamRequest("keyspace1", RangesAtEndpoint.toDummyList(Collections.singleton(range2)), RangesAtEndpoint.toDummyList(Collections.emptyList()), Collections.singleton("cf")); session.state(StreamSession.State.COMPLETE); store.handleStreamEvent(new StreamEvent.SessionCompleteEvent(session)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java index 833ee8b..0710945 100644 --- a/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java +++ b/test/unit/org/apache/cassandra/gms/PendingRangeCalculatorServiceTest.java @@ -62,7 +62,7 @@ public class PendingRangeCalculatorServiceTest @BMRule(name = "Block pending range calculation", targetClass = "TokenMetadata", targetMethod = "calculatePendingRanges", - targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressRanges", + targetLocation = "AT INVOKE org.apache.cassandra.locator.AbstractReplicationStrategy.getAddressReplicas", action = "org.apache.cassandra.gms.PendingRangeCalculatorServiceTest.calculationLock.lock()") public void testDelayedResponse() throws UnknownHostException, InterruptedException { http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java index fccb344..9e3594b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/BigTableWriterTest.java @@ -69,7 +69,7 @@ public class BigTableWriterTest extends AbstractTransactionalTest private TestableBTW(Descriptor desc) { - this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, + this(desc, SSTableTxnWriter.create(cfs, desc, 0, 0, null, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java index dbb929d..faf46bc 100644 --- a/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/CQLSSTableWriterTest.java @@ -649,7 +649,7 @@ public class CQLSSTableWriterTest public void init(String keyspace) { this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges(ks)) + for (Range<Token> range : StorageService.instance.getLocalReplicas(ks).ranges()) addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java index 044cd9f..c4ccf48 100644 --- a/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/LegacySSTableTest.java @@ -176,21 +176,26 @@ public class LegacySSTableTest { for (SSTableReader sstable : cfs.getLiveSSTables()) { - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, 1234, NO_PENDING_REPAIR); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, 1234, NO_PENDING_REPAIR, false); sstable.reloadSSTableMetadata(); assertEquals(1234, sstable.getRepairedAt()); if (sstable.descriptor.version.hasPendingRepair()) assertEquals(NO_PENDING_REPAIR, sstable.getPendingRepair()); } + boolean isTransient = false; for (SSTableReader sstable : cfs.getLiveSSTables()) { UUID random = UUID.randomUUID(); - sstable.descriptor.getMetadataSerializer().mutateRepaired(sstable.descriptor, UNREPAIRED_SSTABLE, random); + sstable.descriptor.getMetadataSerializer().mutateRepairMetadata(sstable.descriptor, UNREPAIRED_SSTABLE, random, isTransient); sstable.reloadSSTableMetadata(); assertEquals(UNREPAIRED_SSTABLE, sstable.getRepairedAt()); if (sstable.descriptor.version.hasPendingRepair()) assertEquals(random, sstable.getPendingRepair()); + if (sstable.descriptor.version.hasIsTransient()) + assertEquals(isTransient, sstable.isTransient()); + + isTransient = !isTransient; } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java index 8509115..5d40f8c 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableLoaderTest.java @@ -31,14 +31,13 @@ import org.junit.Test; import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.db.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.marshal.AsciiType; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.schema.KeyspaceParams; @@ -109,8 +108,8 @@ public class SSTableLoaderTest public void init(String keyspace) { this.keyspace = keyspace; - for (Range<Token> range : StorageService.instance.getLocalRanges(KEYSPACE1)) - addRangeForEndpoint(range, FBUtilities.getBroadcastAddressAndPort()); + for (Replica replica : StorageService.instance.getLocalReplicas(KEYSPACE1)) + addRangeForEndpoint(replica.range(), FBUtilities.getBroadcastAddressAndPort()); } public TableMetadataRef getTableMetadata(String tableName) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java index 6412ef4..7c47c8b 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableRewriterTest.java @@ -877,7 +877,7 @@ public class SSTableRewriterTest extends SSTableWriterTestBase File dir = cfs.getDirectories().getDirectoryForNewSSTables(); Descriptor desc = cfs.newSSTableDescriptor(dir); - try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) + try (SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, desc, 0, 0, null, false, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS))) { int end = f == fileCount - 1 ? partitionCount : ((f + 1) * partitionCount) / fileCount; for ( ; i < end ; i++) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java index 441a3b9..731cee2 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java @@ -219,7 +219,7 @@ public class SSTableUtils TableMetadata metadata = Schema.instance.getTableMetadata(ksname, cfname); ColumnFamilyStore cfs = Schema.instance.getColumnFamilyStoreInstance(metadata.id); SerializationHeader header = appender.header(); - SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, 0, header); + SSTableTxnWriter writer = SSTableTxnWriter.create(cfs, Descriptor.fromFilename(datafile.getAbsolutePath()), expectedSize, UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false, 0, header); while (appender.append(writer)) { /* pass */ } Collection<SSTableReader> readers = writer.finish(true); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java index 5d62cdb..31d0b89 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTest.java @@ -20,6 +20,7 @@ package org.apache.cassandra.io.sstable; import java.io.File; import java.nio.ByteBuffer; +import java.util.UUID; import org.junit.Test; @@ -32,9 +33,12 @@ import org.apache.cassandra.db.rows.*; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.format.SSTableReadsListener; import org.apache.cassandra.io.sstable.format.SSTableWriter; +import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.utils.FBUtilities; import static junit.framework.Assert.fail; +import static org.apache.cassandra.service.ActiveRepairService.NO_PENDING_REPAIR; +import static org.apache.cassandra.service.ActiveRepairService.UNREPAIRED_SSTABLE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -245,4 +249,60 @@ public class SSTableWriterTest extends SSTableWriterTestBase } } + private static void assertValidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient) + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); + + try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient)) + { + // expected + } + catch (IllegalArgumentException e) + { + throw new AssertionError("Unexpected IllegalArgumentException", e); + } + + txn.abort(); + LifecycleTransaction.waitForDeletions(); + } + + private static void assertInvalidRepairMetadata(long repairedAt, UUID pendingRepair, boolean isTransient) + { + Keyspace keyspace = Keyspace.open(KEYSPACE); + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_SMALL_MAX_VALUE); + File dir = cfs.getDirectories().getDirectoryForNewSSTables(); + LifecycleTransaction txn = LifecycleTransaction.offline(OperationType.STREAM); + + try (SSTableWriter writer = getWriter(cfs, dir, txn, repairedAt, pendingRepair, isTransient)) + { + fail("Expected IllegalArgumentException"); + } + catch (IllegalArgumentException e) + { + // expected + } + + txn.abort(); + LifecycleTransaction.waitForDeletions(); + } + + /** + * It should only be possible to create sstables marked transient that also have a pending repair + */ + @Test + public void testRepairMetadataValidation() + { + assertValidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, false); + assertValidRepairMetadata(1, NO_PENDING_REPAIR, false); + assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), false); + assertValidRepairMetadata(UNREPAIRED_SSTABLE, UUID.randomUUID(), true); + + assertInvalidRepairMetadata(UNREPAIRED_SSTABLE, NO_PENDING_REPAIR, true); + assertInvalidRepairMetadata(1, UUID.randomUUID(), false); + assertInvalidRepairMetadata(1, NO_PENDING_REPAIR, true); + + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java index d42c49b..962e1a1 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableWriterTestBase.java @@ -22,6 +22,7 @@ import java.io.File; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; @@ -48,6 +49,7 @@ import org.apache.cassandra.utils.FBUtilities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class SSTableWriterTestBase extends SchemaLoader { @@ -161,10 +163,15 @@ public class SSTableWriterTestBase extends SchemaLoader assertFalse(CompactionManager.instance.submitMaximal(cfs, cfs.gcBefore((int) (System.currentTimeMillis() / 1000)), false).isEmpty()); } - public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn, long repairedAt, UUID pendingRepair, boolean isTransient) { Descriptor desc = cfs.newSSTableDescriptor(directory); - return SSTableWriter.create(desc, 0, 0, null, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn); + return SSTableWriter.create(desc, 0, repairedAt, pendingRepair, isTransient, new SerializationHeader(true, cfs.metadata(), cfs.metadata().regularAndStaticColumns(), EncodingStats.NO_STATS), cfs.indexManager.listIndexes(), txn); + } + + public static SSTableWriter getWriter(ColumnFamilyStore cfs, File directory, LifecycleTransaction txn) + { + return getWriter(cfs, directory, txn, 0, null, false); } public static ByteBuffer random(int i, int size) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java index 9aa4e28..aea3b4a 100644 --- a/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/format/SSTableFlushObserverTest.java @@ -96,7 +96,7 @@ public class SSTableFlushObserverTest KS_NAME, CF_NAME, 0, sstableFormat), - 10L, 0L, null, TableMetadataRef.forOfflineTools(cfm), + 10L, 0L, null, false, TableMetadataRef.forOfflineTools(cfm), new MetadataCollector(cfm.comparator).sstableLevel(0), new SerializationHeader(true, cfm, cfm.regularAndStaticColumns(), EncodingStats.NO_STATS), Collections.singletonList(observer), http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java index 8ab1511..f109d8f 100644 --- a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java @@ -99,7 +99,7 @@ public class MetadataSerializerTest String partitioner = RandomPartitioner.class.getCanonicalName(); double bfFpChance = 0.1; - return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, SerializationHeader.make(cfm, Collections.emptyList())); + return collector.finalizeMetadata(partitioner, bfFpChance, 0, null, false, SerializationHeader.make(cfm, Collections.emptyList())); } @Test http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java index 202d7f1..bf1d940 100644 --- a/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java +++ b/test/unit/org/apache/cassandra/locator/DynamicEndpointSnitchTest.java @@ -50,6 +50,16 @@ public class DynamicEndpointSnitchTest Thread.sleep(150); } + private static EndpointsForRange full(InetAddressAndPort... endpoints) + { + EndpointsForRange.Builder rlist = EndpointsForRange.builder(ReplicaUtils.FULL_RANGE, endpoints.length); + for (InetAddressAndPort endpoint: endpoints) + { + rlist.add(ReplicaUtils.full(endpoint)); + } + return rlist.build(); + } + @Test public void testSnitch() throws InterruptedException, IOException, ConfigurationException { @@ -66,41 +76,41 @@ public class DynamicEndpointSnitchTest // first, make all hosts equal setScores(dsnitch, 1, hosts, 10, 10, 10); - List<InetAddressAndPort> order = Arrays.asList(host1, host2, host3); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + EndpointsForRange order = full(host1, host2, host3); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); // make host1 a little worse setScores(dsnitch, 1, hosts, 20, 10, 10); - order = Arrays.asList(host2, host3, host1); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + order = full(host2, host3, host1); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); // make host2 as bad as host1 setScores(dsnitch, 2, hosts, 15, 20, 10); - order = Arrays.asList(host3, host1, host2); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + order = full(host3, host1, host2); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); // make host3 the worst setScores(dsnitch, 3, hosts, 10, 10, 30); - order = Arrays.asList(host1, host2, host3); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + order = full(host1, host2, host3); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); // make host3 equal to the others setScores(dsnitch, 5, hosts, 10, 10, 10); - order = Arrays.asList(host1, host2, host3); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + order = full(host1, host2, host3); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); /// Tests CASSANDRA-6683 improvements // make the scores differ enough from the ideal order that we sort by score; under the old // dynamic snitch behavior (where we only compared neighbors), these wouldn't get sorted setScores(dsnitch, 20, hosts, 10, 70, 20); - order = Arrays.asList(host1, host3, host2); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3))); + order = full(host1, host3, host2); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3))); - order = Arrays.asList(host4, host1, host3, host2); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4))); + order = full(host4, host1, host3, host2); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4))); setScores(dsnitch, 20, hosts, 10, 10, 10); - order = Arrays.asList(host4, host1, host2, host3); - assertEquals(order, dsnitch.getSortedListByProximity(self, Arrays.asList(host1, host2, host3, host4))); + order = full(host4, host1, host2, host3); + assertEquals(order, dsnitch.sortedByProximity(self, full(host1, host2, host3, host4))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java index ab6c6cd..5f6e26f 100644 --- a/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/NetworkTopologyStrategyTest.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.junit.Assert; @@ -36,12 +37,17 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.dht.OrderPreservingPartitioner.StringToken; +import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.locator.TokenMetadata.Topology; import org.apache.cassandra.service.StorageService; +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; + public class NetworkTopologyStrategyTest { private String keyspaceName = "Keyspace1"; @@ -51,6 +57,7 @@ public class NetworkTopologyStrategyTest public static void setupDD() { DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); } @Test @@ -68,13 +75,14 @@ public class NetworkTopologyStrategyTest // Set the localhost to the tokenmetadata. Embedded cassandra way? NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions); - assert strategy.getReplicationFactor("DC1") == 3; - assert strategy.getReplicationFactor("DC2") == 2; - assert strategy.getReplicationFactor("DC3") == 1; + assert strategy.getReplicationFactor("DC1").allReplicas == 3; + assert strategy.getReplicationFactor("DC2").allReplicas == 2; + assert strategy.getReplicationFactor("DC3").allReplicas == 1; // Query for the natural hosts - ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123")); - assert 6 == endpoints.size(); - assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness + EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123")); + assert 6 == replicas.size(); + assert 6 == replicas.endpoints().size(); // ensure uniqueness + assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness } @Test @@ -92,13 +100,14 @@ public class NetworkTopologyStrategyTest // Set the localhost to the tokenmetadata. Embedded cassandra way? NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions); - assert strategy.getReplicationFactor("DC1") == 3; - assert strategy.getReplicationFactor("DC2") == 3; - assert strategy.getReplicationFactor("DC3") == 0; + assert strategy.getReplicationFactor("DC1").allReplicas == 3; + assert strategy.getReplicationFactor("DC2").allReplicas == 3; + assert strategy.getReplicationFactor("DC3").allReplicas == 0; // Query for the natural hosts - ArrayList<InetAddressAndPort> endpoints = strategy.getNaturalEndpoints(new StringToken("123")); - assert 6 == endpoints.size(); - assert 6 == new HashSet<>(endpoints).size(); // ensure uniqueness + EndpointsForToken replicas = strategy.getNaturalReplicasForToken(new StringToken("123")); + assert 6 == replicas.size(); + assert 6 == replicas.endpoints().size(); // ensure uniqueness + assert 6 == new HashSet<>(replicas.byEndpoint().values()).size(); // ensure uniqueness } @Test @@ -137,12 +146,13 @@ public class NetworkTopologyStrategyTest for (String testToken : new String[]{"123456", "200000", "000402", "ffffff", "400200"}) { - List<InetAddressAndPort> endpoints = strategy.calculateNaturalEndpoints(new StringToken(testToken), metadata); - Set<InetAddressAndPort> epSet = new HashSet<>(endpoints); + EndpointsForRange replicas = strategy.calculateNaturalReplicas(new StringToken(testToken), metadata); + Set<InetAddressAndPort> endpointSet = replicas.endpoints(); - Assert.assertEquals(totalRF, endpoints.size()); - Assert.assertEquals(totalRF, epSet.size()); - logger.debug("{}: {}", testToken, endpoints); + Assert.assertEquals(totalRF, replicas.size()); + Assert.assertEquals(totalRF, new HashSet<>(replicas.byEndpoint().values()).size()); + Assert.assertEquals(totalRF, endpointSet.size()); + logger.debug("{}: {}", testToken, replicas); } } @@ -209,7 +219,7 @@ public class NetworkTopologyStrategyTest { Token token = Murmur3Partitioner.instance.getRandomToken(rand); List<InetAddressAndPort> expected = calculateNaturalEndpoints(token, tokenMetadata, datacenters, snitch); - List<InetAddressAndPort> actual = nts.calculateNaturalEndpoints(token, tokenMetadata); + List<InetAddressAndPort> actual = new ArrayList<>(nts.calculateNaturalReplicas(token, tokenMetadata).endpoints()); if (endpointsDiffer(expected, actual)) { System.err.println("Endpoints mismatch for token " + token); @@ -373,4 +383,50 @@ public class NetworkTopologyStrategyTest Integer replicas = datacenters.get(dc); return replicas == null ? 0 : replicas; } + + private static Token tk(long t) + { + return new LongToken(t); + } + + private static Range<Token> range(long l, long r) + { + return new Range<>(tk(l), tk(r)); + } + + @Test + public void testTransientReplica() throws Exception + { + IEndpointSnitch snitch = new SimpleSnitch(); + DatabaseDescriptor.setEndpointSnitch(snitch); + + List<InetAddressAndPort> endpoints = Lists.newArrayList(InetAddressAndPort.getByName("127.0.0.1"), + InetAddressAndPort.getByName("127.0.0.2"), + InetAddressAndPort.getByName("127.0.0.3"), + InetAddressAndPort.getByName("127.0.0.4")); + + Multimap<InetAddressAndPort, Token> tokens = HashMultimap.create(); + tokens.put(endpoints.get(0), tk(100)); + tokens.put(endpoints.get(1), tk(200)); + tokens.put(endpoints.get(2), tk(300)); + tokens.put(endpoints.get(3), tk(400)); + TokenMetadata metadata = new TokenMetadata(); + metadata.updateNormalTokens(tokens); + + Map<String, String> configOptions = new HashMap<String, String>(); + configOptions.put(snitch.getDatacenter((InetAddressAndPort) null), "3/1"); + + NetworkTopologyStrategy strategy = new NetworkTopologyStrategy(keyspaceName, metadata, snitch, configOptions); + + Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(0), range(400, 100)), + fullReplica(endpoints.get(1), range(400, 100)), + transientReplica(endpoints.get(2), range(400, 100))), + strategy.getNaturalReplicasForToken(tk(99))); + + + Assert.assertEquals(EndpointsForRange.of(fullReplica(endpoints.get(1), range(100, 200)), + fullReplica(endpoints.get(2), range(100, 200)), + transientReplica(endpoints.get(3), range(100, 200))), + strategy.getNaturalReplicasForToken(tk(101))); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org