Repository: cassandra Updated Branches: refs/heads/trunk 5b645de13 -> f7431b432
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java new file mode 100644 index 0000000..8119400 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/DataResolverTransientTest.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import java.util.concurrent.TimeUnit; + +import com.google.common.primitives.Ints; + +import org.apache.cassandra.Util; +import org.apache.cassandra.db.DecoratedKey; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.EmptyIterators; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.SimpleBuilders; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.TestableReadRepair; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.db.ConsistencyLevel.QUORUM; +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.Replica.transientReplica; +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.apache.cassandra.locator.ReplicaUtils.trans; + +/** + * Tests DataResolvers handing of transient replicas + */ +public class DataResolverTransientTest extends AbstractReadResponseTest +{ + private static DecoratedKey key; + + @Before + public void setUp() + { + key = Util.dk("key1"); + } + + private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows) + { + PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false); + for (Row row: rows) + { + builder.add(row); + } + return builder; + } + + private static PartitionUpdate.Builder update(Row... rows) + { + return update(cfm, "key1", rows); + } + + private static Row.SimpleBuilder rowBuilder(int clustering) + { + return new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering)); + } + + private static Row row(long timestamp, int clustering, int value) + { + return rowBuilder(clustering).timestamp(timestamp).add("c1", Integer.toString(value)).build(); + } + + private static DeletionTime deletion(long timeMillis) + { + TimeUnit MILLIS = TimeUnit.MILLISECONDS; + return new DeletionTime(MILLIS.toMicros(timeMillis), Ints.checkedCast(MILLIS.toSeconds(timeMillis))); + } + + /** + * Tests that the given update doesn't cause data resolver to attempt to repair a transient replica + */ + private void assertNoTransientRepairs(PartitionUpdate update) + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(update.metadata(), nowInSec, key); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair repair = new TestableReadRepair(command, QUORUM); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, ConsistencyLevel.QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP1, iter(update), false)); + resolver.preprocess(response(command, EP2, iter(update), false)); + resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(update.metadata()), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + assertPartitionsEqual(filter(iter(update)), resolver.resolve()); + Assert.assertTrue(repair.dataWasConsumed()); + Assert.assertTrue(repair.sent.toString(), repair.sent.isEmpty()); + } + + @Test + public void emptyRowRepair() + { + assertNoTransientRepairs(update(row(1000, 4, 4), row(1000, 5, 5)).build()); + } + + @Test + public void emptyPartitionDeletionRepairs() + { + PartitionUpdate.Builder builder = update(); + builder.addPartitionDeletion(deletion(1999)); + assertNoTransientRepairs(builder.build()); + } + + /** + * Partition level deletion responses shouldn't sent data to a transient replica + */ + @Test + public void emptyRowDeletionRepairs() + { + PartitionUpdate.Builder builder = update(); + builder.add(rowBuilder(1).timestamp(1999).delete().build()); + assertNoTransientRepairs(builder.build()); + } + + @Test + public void emptyComplexDeletionRepair() + { + + long[] ts = {1000, 2000}; + + Row.Builder builder = BTreeRow.unsortedBuilder(); + builder.newRow(Clustering.EMPTY); + builder.addComplexDeletion(m, new DeletionTime(ts[0] - 1, nowInSec)); + assertNoTransientRepairs(update(cfm2, "key", builder.build()).build()); + + } + + @Test + public void emptyRangeTombstoneRepairs() + { + Slice slice = Slice.make(Clustering.make(ByteBufferUtil.bytes("a")), Clustering.make(ByteBufferUtil.bytes("b"))); + PartitionUpdate.Builder builder = update(); + builder.add(new RangeTombstone(slice, deletion(2000))); + assertNoTransientRepairs(builder.build()); + } + + /** + * If the full replicas need to repair each other, repairs shouldn't be sent to transient replicas + */ + @Test + public void fullRepairsIgnoreTransientReplicas() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair repair = new TestableReadRepair(command, QUORUM); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP1, iter(update(row(1000, 5, 5)).build()), false)); + resolver.preprocess(response(command, EP2, iter(update(row(2000, 4, 4)).build()), false)); + resolver.preprocess(response(command, EP3, EmptyIterators.unfilteredPartition(cfm), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + + consume(resolver.resolve()); + + Assert.assertTrue(repair.dataWasConsumed()); + + Assert.assertTrue(repair.sent.containsKey(EP1)); + Assert.assertTrue(repair.sent.containsKey(EP2)); + Assert.assertFalse(repair.sent.containsKey(EP3)); + } + + /** + * If the transient replica has new data, the full replicas shoould be repaired, the transient one should not + */ + @Test + public void transientMismatchesRepairFullReplicas() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk(5)); + EndpointsForToken targetReplicas = EndpointsForToken.of(key.getToken(), full(EP1), full(EP2), trans(EP3)); + TestableReadRepair<?, ?> repair = new TestableReadRepair(command, QUORUM); + DataResolver resolver = new DataResolver(command, plan(targetReplicas, QUORUM), repair, 0); + + Assert.assertFalse(resolver.isDataPresent()); + PartitionUpdate transData = update(row(1000, 5, 5)).build(); + resolver.preprocess(response(command, EP1, EmptyIterators.unfilteredPartition(cfm), false)); + resolver.preprocess(response(command, EP2, EmptyIterators.unfilteredPartition(cfm), false)); + resolver.preprocess(response(command, EP3, iter(transData), false)); + + Assert.assertFalse(repair.dataWasConsumed()); + + assertPartitionsEqual(filter(iter(transData)), resolver.resolve()); + + Assert.assertTrue(repair.dataWasConsumed()); + + assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP1).getPartitionUpdate(cfm)))); + assertPartitionsEqual(filter(iter(transData)), filter(iter(repair.sent.get(EP2).getPartitionUpdate(cfm)))); + Assert.assertFalse(repair.sent.containsKey(EP3)); + + } + + private ReplicaLayout.ForToken plan(EndpointsForToken replicas, ConsistencyLevel consistencyLevel) + { + return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java new file mode 100644 index 0000000..5306a74 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/DigestResolverTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.SimpleBuilders; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.reads.repair.NoopReadRepair; +import org.apache.cassandra.service.reads.repair.TestableReadRepair; + +import static org.apache.cassandra.locator.ReplicaUtils.full; +import static org.apache.cassandra.locator.ReplicaUtils.trans; + +public class DigestResolverTest extends AbstractReadResponseTest +{ + private static PartitionUpdate.Builder update(TableMetadata metadata, String key, Row... rows) + { + PartitionUpdate.Builder builder = new PartitionUpdate.Builder(metadata, dk(key), metadata.regularAndStaticColumns(), rows.length, false); + for (Row row: rows) + { + builder.add(row); + } + return builder; + } + + private static PartitionUpdate.Builder update(Row... rows) + { + return update(cfm, "key1", rows); + } + + private static Row row(long timestamp, int clustering, int value) + { + SimpleBuilders.RowBuilder builder = new SimpleBuilders.RowBuilder(cfm, Integer.toString(clustering)); + builder.timestamp(timestamp).add("c1", Integer.toString(value)); + return builder.build(); + } + + @Test + public void noRepairNeeded() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2)); + TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0); + + PartitionUpdate response = update(row(1000, 4, 4), row(1000, 5, 5)).build(); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP2, iter(response), true)); + resolver.preprocess(response(command, EP1, iter(response), false)); + Assert.assertTrue(resolver.isDataPresent()); + Assert.assertTrue(resolver.responsesMatch()); + + assertPartitionsEqual(filter(iter(response)), resolver.getData()); + } + + @Test + public void digestMismatch() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), full(EP2)); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance,0); + + PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build(); + PartitionUpdate response2 = update(row(2000, 4, 5)).build(); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP2, iter(response1), true)); + resolver.preprocess(response(command, EP1, iter(response2), false)); + Assert.assertTrue(resolver.isDataPresent()); + Assert.assertFalse(resolver.responsesMatch()); + Assert.assertFalse(resolver.hasTransientResponse()); + } + + /** + * A full response and a transient response, with the transient response being a subset of the full one + */ + @Test + public void agreeingTransient() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2)); + TestableReadRepair readRepair = new TestableReadRepair(command, ConsistencyLevel.QUORUM); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), readRepair, 0); + + PartitionUpdate response1 = update(row(1000, 4, 4), row(1000, 5, 5)).build(); + PartitionUpdate response2 = update(row(1000, 5, 5)).build(); + + Assert.assertFalse(resolver.isDataPresent()); + resolver.preprocess(response(command, EP1, iter(response1), false)); + resolver.preprocess(response(command, EP2, iter(response2), false)); + Assert.assertTrue(resolver.isDataPresent()); + Assert.assertTrue(resolver.responsesMatch()); + Assert.assertTrue(resolver.hasTransientResponse()); + Assert.assertTrue(readRepair.sent.isEmpty()); + } + + /** + * Transient responses shouldn't be classified as the single dataResponse + */ + @Test + public void transientResponse() + { + SinglePartitionReadCommand command = SinglePartitionReadCommand.fullPartitionRead(cfm, nowInSec, dk); + EndpointsForToken targetReplicas = EndpointsForToken.of(dk.getToken(), full(EP1), trans(EP2)); + DigestResolver resolver = new DigestResolver(command, plan(ConsistencyLevel.QUORUM, targetReplicas), NoopReadRepair.instance, 0); + + PartitionUpdate response2 = update(row(1000, 5, 5)).build(); + Assert.assertFalse(resolver.isDataPresent()); + Assert.assertFalse(resolver.hasTransientResponse()); + resolver.preprocess(response(command, EP2, iter(response2), false)); + Assert.assertFalse(resolver.isDataPresent()); + Assert.assertTrue(resolver.hasTransientResponse()); + } + + private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken replicas) + { + return new ReplicaLayout.ForToken(ks, consistencyLevel, replicas.token(), replicas, null, replicas); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java index de7b2e4..3b102f2 100644 --- a/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java +++ b/test/unit/org/apache/cassandra/service/reads/ReadExecutorTest.java @@ -18,10 +18,12 @@ package org.apache.cassandra.service.reads; -import java.util.List; import java.util.concurrent.TimeUnit; -import com.google.common.collect.ImmutableList; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.EndpointsForRange; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; @@ -35,12 +37,15 @@ import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.exceptions.ReadFailureException; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.service.reads.AbstractReadExecutor; +import static org.apache.cassandra.locator.ReplicaUtils.full; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -48,7 +53,8 @@ public class ReadExecutorTest { static Keyspace ks; static ColumnFamilyStore cfs; - static List<InetAddressAndPort> targets; + static EndpointsForToken targets; + static Token dummy; @BeforeClass public static void setUpClass() throws Throwable @@ -57,8 +63,13 @@ public class ReadExecutorTest SchemaLoader.createKeyspace("Foo", KeyspaceParams.simple(3), SchemaLoader.standardCFMD("Foo", "Bar")); ks = Keyspace.open("Foo"); cfs = ks.getColumnFamilyStore("Bar"); - targets = ImmutableList.of(InetAddressAndPort.getByName("127.0.0.255"), InetAddressAndPort.getByName("127.0.0.254"), InetAddressAndPort.getByName("127.0.0.253")); - cfs.sampleLatencyNanos = 0; + dummy = Murmur3Partitioner.instance.getMinimumToken(); + targets = EndpointsForToken.of(dummy, + full(InetAddressAndPort.getByName("127.0.0.255")), + full(InetAddressAndPort.getByName("127.0.0.254")), + full(InetAddressAndPort.getByName("127.0.0.253")) + ); + cfs.sampleReadLatencyNanos = 0; } @Before @@ -78,7 +89,7 @@ public class ReadExecutorTest { assertEquals(0, cfs.metric.speculativeInsufficientReplicas.getCount()); assertEquals(0, ks.metric.speculativeInsufficientReplicas.getCount()); - AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), true); + AbstractReadExecutor executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(cfs, new MockSinglePartitionReadCommand(), plan(targets, ConsistencyLevel.LOCAL_QUORUM), System.nanoTime(), true); executor.maybeTryAdditionalReplicas(); try { @@ -93,7 +104,7 @@ public class ReadExecutorTest assertEquals(1, ks.metric.speculativeInsufficientReplicas.getCount()); //Shouldn't increment - executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime(), false); + executor = new AbstractReadExecutor.NeverSpeculatingReadExecutor(cfs, new MockSinglePartitionReadCommand(), plan(targets, ConsistencyLevel.LOCAL_QUORUM), System.nanoTime(), false); executor.maybeTryAdditionalReplicas(); try { @@ -119,7 +130,7 @@ public class ReadExecutorTest assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); assertEquals(0, ks.metric.speculativeRetries.getCount()); assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); - AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, new MockSinglePartitionReadCommand(TimeUnit.DAYS.toMillis(365)), plan(ConsistencyLevel.LOCAL_QUORUM, targets, targets.subList(0, 2)), System.nanoTime()); executor.maybeTryAdditionalReplicas(); new Thread() { @@ -127,8 +138,8 @@ public class ReadExecutorTest public void run() { //Failures end the read promptly but don't require mock data to be suppleid - executor.handler.onFailure(targets.get(0), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); - executor.handler.onFailure(targets.get(1), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.onFailure(targets.get(0).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); + executor.handler.onFailure(targets.get(1).endpoint(), RequestFailureReason.READ_TOO_MANY_TOMBSTONES); executor.handler.condition.signalAll(); } }.start(); @@ -160,7 +171,7 @@ public class ReadExecutorTest assertEquals(0, cfs.metric.speculativeFailedRetries.getCount()); assertEquals(0, ks.metric.speculativeRetries.getCount()); assertEquals(0, ks.metric.speculativeFailedRetries.getCount()); - AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(ks, cfs, new MockSinglePartitionReadCommand(), ConsistencyLevel.LOCAL_QUORUM, targets, System.nanoTime()); + AbstractReadExecutor executor = new AbstractReadExecutor.SpeculatingReadExecutor(cfs, new MockSinglePartitionReadCommand(), plan(ConsistencyLevel.LOCAL_QUORUM, targets, targets.subList(0, 2)), System.nanoTime()); executor.maybeTryAdditionalReplicas(); try { @@ -188,7 +199,7 @@ public class ReadExecutorTest MockSinglePartitionReadCommand(long timeout) { - super(false, 0, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null); + super(false, 0, false, cfs.metadata(), 0, null, null, null, Util.dk("ry@n_luvs_teh_y@nk33z"), null, null); this.timeout = timeout; } @@ -213,4 +224,13 @@ public class ReadExecutorTest } + private ReplicaLayout.ForToken plan(EndpointsForToken targets, ConsistencyLevel consistencyLevel) + { + return plan(consistencyLevel, targets, targets); + } + + private ReplicaLayout.ForToken plan(ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken selected) + { + return new ReplicaLayout.ForToken(ks, consistencyLevel, natural.token(), natural, null, selected); + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java index 9717c4e..7e6ee29 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/AbstractReadRepairTest.java @@ -8,7 +8,6 @@ import java.util.function.Consumer; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.collect.Sets; import com.google.common.primitives.Ints; import org.junit.Assert; @@ -39,7 +38,11 @@ import org.apache.cassandra.db.rows.BufferCell; import org.apache.cassandra.db.rows.Cell; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.KeyspaceMetadata; @@ -49,6 +52,9 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.Tables; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.locator.Replica.fullReplica; +import static org.apache.cassandra.locator.ReplicaUtils.FULL_RANGE; + @Ignore public abstract class AbstractReadRepairTest { @@ -60,6 +66,12 @@ public abstract class AbstractReadRepairTest static InetAddressAndPort target3; static List<InetAddressAndPort> targets; + static Replica replica1; + static Replica replica2; + static Replica replica3; + static EndpointsForRange replicas; + static ReplicaLayout<?, ?> replicaLayout; + static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); static DecoratedKey key; static Cell cell1; @@ -191,7 +203,8 @@ public abstract class AbstractReadRepairTest ks = Keyspace.open(ksName); cfs = ks.getColumnFamilyStore("tbl"); - cfs.sampleLatencyNanos = 0; + cfs.sampleReadLatencyNanos = 0; + cfs.transientWriteLatencyNanos = 0; target1 = InetAddressAndPort.getByName("127.0.0.255"); target2 = InetAddressAndPort.getByName("127.0.0.254"); @@ -199,6 +212,13 @@ public abstract class AbstractReadRepairTest targets = ImmutableList.of(target1, target2, target3); + replica1 = fullReplica(target1, FULL_RANGE); + replica2 = fullReplica(target2, FULL_RANGE); + replica3 = fullReplica(target3, FULL_RANGE); + replicas = EndpointsForRange.of(replica1, replica2, replica3); + + replicaLayout = replicaLayout(ConsistencyLevel.QUORUM, replicas); + // default test values key = dk(5); cell1 = cell("v", "val1", now); @@ -220,14 +240,26 @@ public abstract class AbstractReadRepairTest public void setUp() { assert configured : "configureClass must be called in a @BeforeClass method"; - cfs.sampleLatencyNanos = 0; + + cfs.sampleReadLatencyNanos = 0; + cfs.transientWriteLatencyNanos = 0; + } + + static ReplicaLayout.ForRange replicaLayout(EndpointsForRange replicas, EndpointsForRange targets) + { + return new ReplicaLayout.ForRange(ks, ConsistencyLevel.QUORUM, ReplicaUtils.FULL_BOUNDS, replicas, targets); + } + + static ReplicaLayout.ForRange replicaLayout(ConsistencyLevel consistencyLevel, EndpointsForRange replicas) + { + return new ReplicaLayout.ForRange(ks, consistencyLevel, ReplicaUtils.FULL_BOUNDS, replicas, replicas); } - public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency); + public abstract InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime); - public InstrumentedReadRepair createInstrumentedReadRepair() + public InstrumentedReadRepair createInstrumentedReadRepair(ReplicaLayout<?, ?> replicaLayout) { - return createInstrumentedReadRepair(command, System.nanoTime(), ConsistencyLevel.QUORUM); + return createInstrumentedReadRepair(command, replicaLayout, System.nanoTime()); } @@ -238,12 +270,11 @@ public abstract class AbstractReadRepairTest @Test public void readSpeculationCycle() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); ResultConsumer consumer = new ResultConsumer(); - Assert.assertEquals(epSet(), repair.getReadRecipients()); - repair.startRepair(null, targets, Lists.newArrayList(target1, target2), consumer); + repair.startRepair(null, consumer); Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients()); repair.maybeSendAdditionalReads(); @@ -258,11 +289,11 @@ public abstract class AbstractReadRepairTest @Test public void noSpeculationRequired() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); ResultConsumer consumer = new ResultConsumer(); Assert.assertEquals(epSet(), repair.getReadRecipients()); - repair.startRepair(null, targets, Lists.newArrayList(target1, target2), consumer); + repair.startRepair(null, consumer); Assert.assertEquals(epSet(target1, target2), repair.getReadRecipients()); repair.getReadCallback().response(msg(target1, cell1)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java index b06e88a..a574d02 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/BlockingReadRepairTest.java @@ -18,10 +18,8 @@ package org.apache.cassandra.service.reads.repair; -import java.util.Collection; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -31,24 +29,26 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import org.apache.cassandra.Util; import org.apache.cassandra.db.ConsistencyLevel; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaUtils; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.service.reads.ReadCallback; public class BlockingReadRepairTest extends AbstractReadRepairTest { - - private static class InstrumentedReadRepairHandler extends BlockingPartitionRepair + private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> { - public InstrumentedReadRepairHandler(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants) + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) { - super(keyspace, key, consistency, repairs, maxBlockFor, participants); + super(Util.dk("not a real usable value"), repairs, maxBlockFor, replicaLayout); } Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); @@ -58,13 +58,6 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest mutationsSent.put(endpoint, message.payload); } - List<InetAddressAndPort> candidates = targets; - - protected List<InetAddressAndPort> getCandidateEndpoints() - { - return candidates; - } - @Override protected boolean isLocal(InetAddressAndPort endpoint) { @@ -78,23 +71,22 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest configureClass(ReadRepairStrategy.BLOCKING); } - private static InstrumentedReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, Collection<InetAddressAndPort> participants) + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout) { - InetAddressAndPort[] participantArray = new InetAddressAndPort[participants.size()]; - participants.toArray(participantArray); - return new InstrumentedReadRepairHandler(ks, key, ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray); + return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout); } - private static InstrumentedReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor) + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) { - return createRepairHandler(repairs, maxBlockFor, repairs.keySet()); + EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); + return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas)); } - private static class InstrumentedBlockingReadRepair extends BlockingReadRepair implements InstrumentedReadRepair + private static class InstrumentedBlockingReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingReadRepair<E, L> implements InstrumentedReadRepair<E, L> { - public InstrumentedBlockingReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public InstrumentedBlockingReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) { - super(command, queryStartNanoTime, consistency); + super(command, replicaLayout, queryStartNanoTime); } Set<InetAddressAndPort> readCommandRecipients = new HashSet<>(); @@ -109,12 +101,6 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest } @Override - Iterable<InetAddressAndPort> getCandidatesForToken(Token token) - { - return targets; - } - - @Override public Set<InetAddressAndPort> getReadRecipients() { return readCommandRecipients; @@ -128,9 +114,9 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest } @Override - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) { - return new InstrumentedBlockingReadRepair(command, queryStartNanoTime, consistency); + return new InstrumentedBlockingReadRepair(command, replicaLayout, queryStartNanoTime); } @Test @@ -152,12 +138,12 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest Mutation repair2 = mutation(cell1); // check that the correct repairs are calculated - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, repair1); - repairs.put(target2, repair2); - + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, repair1); + repairs.put(replica2, repair2); - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); + ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, replicaLayout); Assert.assertTrue(handler.mutationsSent.isEmpty()); @@ -188,9 +174,9 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest @Test public void noAdditionalMutationRequired() throws Exception { - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, mutation(cell2)); - repairs.put(target2, mutation(cell1)); + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, mutation(cell2)); + repairs.put(replica2, mutation(cell1)); InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); handler.sendInitialRepairs(); @@ -209,15 +195,14 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest @Test public void noAdditionalMutationPossible() throws Exception { - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, mutation(cell2)); - repairs.put(target2, mutation(cell1)); + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, mutation(cell2)); + repairs.put(replica2, mutation(cell1)); InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); handler.sendInitialRepairs(); // we've already sent mutations to all candidates, so we shouldn't send any more - handler.candidates = Lists.newArrayList(target1, target2); handler.mutationsSent.clear(); handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); Assert.assertTrue(handler.mutationsSent.isEmpty()); @@ -232,12 +217,11 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest { Mutation repair1 = mutation(cell2); - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, repair1); - Collection<InetAddressAndPort> participants = Lists.newArrayList(target1, target2); + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, repair1); // check that the correct initial mutations are sent out - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout(replicas, EndpointsForRange.of(replica1, replica2))); handler.sendInitialRepairs(); Assert.assertEquals(1, handler.mutationsSent.size()); Assert.assertTrue(handler.mutationsSent.containsKey(target1)); @@ -252,10 +236,10 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest @Test public void onlyBlockOnQuorum() { - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, mutation(cell1)); - repairs.put(target2, mutation(cell2)); - repairs.put(target3, mutation(cell3)); + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, mutation(cell1)); + repairs.put(replica2, mutation(cell2)); + repairs.put(replica3, mutation(cell3)); Assert.assertEquals(3, repairs.size()); InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2); @@ -277,30 +261,29 @@ public class BlockingReadRepairTest extends AbstractReadRepairTest @Test public void remoteDCTest() throws Exception { - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, mutation(cell1)); - + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, mutation(cell1)); - InetAddressAndPort remote1 = InetAddressAndPort.getByName("10.0.0.1"); - InetAddressAndPort remote2 = InetAddressAndPort.getByName("10.0.0.2"); + Replica remote1 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.1")); + Replica remote2 = ReplicaUtils.full(InetAddressAndPort.getByName("10.0.0.2")); repairs.put(remote1, mutation(cell1)); - Collection<InetAddressAndPort> participants = Lists.newArrayList(target1, target2, remote1, remote2); - - InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants); + EndpointsForRange participants = EndpointsForRange.of(replica1, replica2, remote1, remote2); + ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, participants, participants); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout); handler.sendInitialRepairs(); Assert.assertEquals(2, handler.mutationsSent.size()); - Assert.assertTrue(handler.mutationsSent.containsKey(target1)); - Assert.assertTrue(handler.mutationsSent.containsKey(remote1)); + Assert.assertTrue(handler.mutationsSent.containsKey(replica1.endpoint())); + Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint())); Assert.assertEquals(1, handler.waitingOn()); Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); - handler.ack(remote1); + handler.ack(remote1.endpoint()); Assert.assertEquals(1, handler.waitingOn()); Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); - handler.ack(target1); + handler.ack(replica1.endpoint()); Assert.assertEquals(0, handler.waitingOn()); Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java index 1f07c2b..c345ee6 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/DiagEventsBlockingReadRepairTest.java @@ -26,20 +26,23 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import com.google.common.collect.Lists; import org.junit.After; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.config.OverrideConfigurationLoader; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.dht.Token; import org.apache.cassandra.diag.DiagnosticEventService; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.EndpointsForRange; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.service.reads.ReadCallback; import org.apache.cassandra.service.reads.repair.ReadRepairEvent.ReadRepairEventType; @@ -72,12 +75,13 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Mutation repair2 = mutation(cell1); // check that the correct repairs are calculated - Map<InetAddressAndPort, Mutation> repairs = new HashMap<>(); - repairs.put(target1, repair1); - repairs.put(target2, repair2); + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(replica1, repair1); + repairs.put(replica2, repair2); - DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2); + ReplicaLayout.ForRange replicaLayout = replicaLayout(replicas, EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet()))); + DiagnosticPartitionReadRepairHandler handler = createRepairHandler(repairs, 2, replicaLayout); Assert.assertTrue(handler.updatesByEp.isEmpty()); @@ -102,17 +106,20 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); } - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) { - return new DiagnosticBlockingRepairHandler(command, queryStartNanoTime, consistency); + return new DiagnosticBlockingRepairHandler(command, replicaLayout, queryStartNanoTime); } - private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor) + private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, ReplicaLayout<?, ?> replicaLayout) { - Set<InetAddressAndPort> participants = repairs.keySet(); - InetAddressAndPort[] participantArray = new InetAddressAndPort[participants.size()]; - participants.toArray(participantArray); - return new DiagnosticPartitionReadRepairHandler(ks, key, ConsistencyLevel.LOCAL_QUORUM, repairs, maxBlockFor, participantArray); + return new DiagnosticPartitionReadRepairHandler(key, repairs, maxBlockFor, replicaLayout); + } + + private static DiagnosticPartitionReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor) + { + EndpointsForRange replicas = EndpointsForRange.copyOf(Lists.newArrayList(repairs.keySet())); + return createRepairHandler(repairs, maxBlockFor, replicaLayout(replicas, replicas)); } private static class DiagnosticBlockingRepairHandler extends BlockingReadRepair implements InstrumentedReadRepair @@ -120,9 +127,9 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest private Set<InetAddressAndPort> recipients = Collections.emptySet(); private ReadCallback readCallback = null; - DiagnosticBlockingRepairHandler(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + DiagnosticBlockingRepairHandler(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) { - super(command, queryStartNanoTime, consistency); + super(command, replicaLayout, queryStartNanoTime); DiagnosticEventService.instance().subscribe(ReadRepairEvent.class, this::onRepairEvent); } @@ -130,7 +137,7 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest { if (e.getType() == ReadRepairEventType.START_REPAIR) recipients = new HashSet<>(e.destinations); else if (e.getType() == ReadRepairEventType.SPECULATED_READ) recipients.addAll(e.destinations); - Assert.assertEquals(targets, e.allEndpoints); + Assert.assertEquals(new HashSet<>(targets), new HashSet<>(e.allEndpoints)); Assert.assertNotNull(e.toMap()); } @@ -156,13 +163,13 @@ public class DiagEventsBlockingReadRepairTest extends AbstractReadRepairTest } } - private static class DiagnosticPartitionReadRepairHandler extends BlockingPartitionRepair + private static class DiagnosticPartitionReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> { private final Map<InetAddressAndPort, String> updatesByEp = new HashMap<>(); - DiagnosticPartitionReadRepairHandler(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistency, Map<InetAddressAndPort, Mutation> repairs, int maxBlockFor, InetAddressAndPort[] participants) + DiagnosticPartitionReadRepairHandler(DecoratedKey key, Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) { - super(keyspace, key, consistency, repairs, maxBlockFor, participants); + super(key, repairs, maxBlockFor, replicaLayout); DiagnosticEventService.instance().subscribe(PartitionRepairEvent.class, this::onRepairEvent); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java index 2fb8ffc..f3d2866 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/InstrumentedReadRepair.java @@ -20,10 +20,12 @@ package org.apache.cassandra.service.reads.repair; import java.util.Set; +import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.ReadCallback; -public interface InstrumentedReadRepair extends ReadRepair +public interface InstrumentedReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadRepair<E, L> { Set<InetAddressAndPort> getReadRecipients(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java index efce59a..9bb7eed 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepairTest.java @@ -26,20 +26,20 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; -import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.ReadCallback; public class ReadOnlyReadRepairTest extends AbstractReadRepairTest { - private static class InstrumentedReadOnlyReadRepair extends ReadOnlyReadRepair implements InstrumentedReadRepair + private static class InstrumentedReadOnlyReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends ReadOnlyReadRepair implements InstrumentedReadRepair { - public InstrumentedReadOnlyReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public InstrumentedReadOnlyReadRepair(ReadCommand command, L replicaLayout, long queryStartNanoTime) { - super(command, queryStartNanoTime, consistency); + super(command, replicaLayout, queryStartNanoTime); } Set<InetAddressAndPort> readCommandRecipients = new HashSet<>(); @@ -54,12 +54,6 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest } @Override - Iterable<InetAddressAndPort> getCandidatesForToken(Token token) - { - return targets; - } - - @Override public Set<InetAddressAndPort> getReadRecipients() { return readCommandRecipients; @@ -79,22 +73,24 @@ public class ReadOnlyReadRepairTest extends AbstractReadRepairTest } @Override - public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, long queryStartNanoTime, ConsistencyLevel consistency) + public InstrumentedReadRepair createInstrumentedReadRepair(ReadCommand command, ReplicaLayout<?, ?> replicaLayout, long queryStartNanoTime) { - return new InstrumentedReadOnlyReadRepair(command, queryStartNanoTime, consistency); + return new InstrumentedReadOnlyReadRepair(command, replicaLayout, queryStartNanoTime); } @Test public void getMergeListener() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(); - Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(new InetAddressAndPort[]{})); + ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout); + Assert.assertSame(UnfilteredPartitionIterators.MergeListener.NOOP, repair.getMergeListener(replicaLayout)); } @Test(expected = UnsupportedOperationException.class) public void repairPartitionFailure() { - InstrumentedReadRepair repair = createInstrumentedReadRepair(); - repair.repairPartition(dk(1), Collections.emptyMap(), new InetAddressAndPort[]{}); + ReplicaLayout<?, ?> replicaLayout = replicaLayout(replicas, replicas); + InstrumentedReadRepair repair = createInstrumentedReadRepair(replicaLayout); + repair.repairPartition(null, Collections.emptyMap(), replicaLayout); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java new file mode 100644 index 0000000..e4ba25d --- /dev/null +++ b/test/unit/org/apache/cassandra/service/reads/repair/ReadRepairTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.reads.repair; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.Util; +import org.apache.cassandra.locator.Endpoints; +import org.apache.cassandra.locator.EndpointsForRange; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.cql3.statements.schema.CreateTableStatement; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.Mutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.rows.BTreeRow; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; +import org.apache.cassandra.locator.ReplicaUtils; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.KeyspaceParams; +import org.apache.cassandra.schema.MigrationManager; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.schema.Tables; +import org.apache.cassandra.utils.ByteBufferUtil; + +import static org.apache.cassandra.locator.ReplicaUtils.full; + +public class ReadRepairTest +{ + static Keyspace ks; + static ColumnFamilyStore cfs; + static TableMetadata cfm; + static Replica target1; + static Replica target2; + static Replica target3; + static EndpointsForRange targets; + + private static class InstrumentedReadRepairHandler<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> extends BlockingPartitionRepair<E, L> + { + public InstrumentedReadRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, L replicaLayout) + { + super(Util.dk("not a valid key"), repairs, maxBlockFor, replicaLayout); + } + + Map<InetAddressAndPort, Mutation> mutationsSent = new HashMap<>(); + + protected void sendRR(MessageOut<Mutation> message, InetAddressAndPort endpoint) + { + mutationsSent.put(endpoint, message.payload); + } + + @Override + protected boolean isLocal(InetAddressAndPort endpoint) + { + return targets.endpoints().contains(endpoint); + } + } + + static long now = TimeUnit.NANOSECONDS.toMicros(System.nanoTime()); + static DecoratedKey key; + static Cell cell1; + static Cell cell2; + static Cell cell3; + static Mutation resolved; + + private static void assertRowsEqual(Row expected, Row actual) + { + try + { + Assert.assertEquals(expected == null, actual == null); + if (expected == null) + return; + Assert.assertEquals(expected.clustering(), actual.clustering()); + Assert.assertEquals(expected.deletion(), actual.deletion()); + Assert.assertArrayEquals(Iterables.toArray(expected.cells(), Cell.class), Iterables.toArray(expected.cells(), Cell.class)); + } catch (Throwable t) + { + throw new AssertionError(String.format("Row comparison failed, expected %s got %s", expected, actual), t); + } + } + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + String ksName = "ks"; + + cfm = CreateTableStatement.parse("CREATE TABLE tbl (k int primary key, v text)", ksName).build(); + KeyspaceMetadata ksm = KeyspaceMetadata.create(ksName, KeyspaceParams.simple(3), Tables.of(cfm)); + MigrationManager.announceNewKeyspace(ksm, false); + + ks = Keyspace.open(ksName); + cfs = ks.getColumnFamilyStore("tbl"); + + cfs.sampleReadLatencyNanos = 0; + + target1 = full(InetAddressAndPort.getByName("127.0.0.255")); + target2 = full(InetAddressAndPort.getByName("127.0.0.254")); + target3 = full(InetAddressAndPort.getByName("127.0.0.253")); + + targets = EndpointsForRange.of(target1, target2, target3); + + // default test values + key = dk(5); + cell1 = cell("v", "val1", now); + cell2 = cell("v", "val2", now); + cell3 = cell("v", "val3", now); + resolved = mutation(cell1, cell2); + } + + private static DecoratedKey dk(int v) + { + return DatabaseDescriptor.getPartitioner().decorateKey(ByteBufferUtil.bytes(v)); + } + + private static Cell cell(String name, String value, long timestamp) + { + return BufferCell.live(cfm.getColumn(ColumnIdentifier.getInterned(name, false)), timestamp, ByteBufferUtil.bytes(value)); + } + + private static Mutation mutation(Cell... cells) + { + Row.Builder builder = BTreeRow.unsortedBuilder(); + builder.newRow(Clustering.EMPTY); + for (Cell cell: cells) + { + builder.addCell(cell); + } + return new Mutation(PartitionUpdate.singleRowUpdate(cfm, key, builder.build())); + } + + private static InstrumentedReadRepairHandler createRepairHandler(Map<Replica, Mutation> repairs, int maxBlockFor, EndpointsForRange all, EndpointsForRange targets) + { + ReplicaLayout.ForRange replicaLayout = new ReplicaLayout.ForRange(ks, ConsistencyLevel.LOCAL_QUORUM, ReplicaUtils.FULL_BOUNDS, all, targets); + return new InstrumentedReadRepairHandler(repairs, maxBlockFor, replicaLayout); + } + + @Test + public void consistencyLevelTest() throws Exception + { + Assert.assertTrue(ConsistencyLevel.QUORUM.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertTrue(ConsistencyLevel.THREE.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertTrue(ConsistencyLevel.TWO.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertFalse(ConsistencyLevel.ONE.satisfies(ConsistencyLevel.QUORUM, ks)); + Assert.assertFalse(ConsistencyLevel.ANY.satisfies(ConsistencyLevel.QUORUM, ks)); + } + + private static void assertMutationEqual(Mutation expected, Mutation actual) + { + Assert.assertEquals(expected.getKeyspaceName(), actual.getKeyspaceName()); + Assert.assertEquals(expected.key(), actual.key()); + Assert.assertEquals(expected.key(), actual.key()); + PartitionUpdate expectedUpdate = Iterables.getOnlyElement(expected.getPartitionUpdates()); + PartitionUpdate actualUpdate = Iterables.getOnlyElement(actual.getPartitionUpdates()); + assertRowsEqual(Iterables.getOnlyElement(expectedUpdate), Iterables.getOnlyElement(actualUpdate)); + } + + @Test + public void additionalMutationRequired() throws Exception + { + Mutation repair1 = mutation(cell2); + Mutation repair2 = mutation(cell1); + + // check that the correct repairs are calculated + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, repair1); + repairs.put(target2, repair2); + + InstrumentedReadRepairHandler<?, ?> handler = createRepairHandler(repairs, 2, + targets, EndpointsForRange.of(target1, target2)); + + Assert.assertTrue(handler.mutationsSent.isEmpty()); + + // check that the correct mutations are sent + handler.sendInitialRepairs(); + Assert.assertEquals(2, handler.mutationsSent.size()); + assertMutationEqual(repair1, handler.mutationsSent.get(target1.endpoint())); + assertMutationEqual(repair2, handler.mutationsSent.get(target2.endpoint())); + + // check that a combined mutation is speculatively sent to the 3rd target + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertEquals(1, handler.mutationsSent.size()); + assertMutationEqual(resolved, handler.mutationsSent.get(target3.endpoint())); + + // check repairs stop blocking after receiving 2 acks + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target1.endpoint()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target3.endpoint()); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + } + + /** + * If we've received enough acks, we shouldn't send any additional mutations + */ + @Test + public void noAdditionalMutationRequired() throws Exception + { + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell2)); + repairs.put(target2, mutation(cell1)); + + EndpointsForRange replicas = EndpointsForRange.of(target1, target2); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, targets); + handler.sendInitialRepairs(); + handler.ack(target1.endpoint()); + handler.ack(target2.endpoint()); + + // both replicas have acked, we shouldn't send anything else out + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertTrue(handler.mutationsSent.isEmpty()); + } + + /** + * If there are no additional nodes we can send mutations to, we... shouldn't + */ + @Test + public void noAdditionalMutationPossible() throws Exception + { + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell2)); + repairs.put(target2, mutation(cell1)); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, EndpointsForRange.of(target1, target2), + EndpointsForRange.of(target1, target2)); + handler.sendInitialRepairs(); + + // we've already sent mutations to all candidates, so we shouldn't send any more + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + Assert.assertTrue(handler.mutationsSent.isEmpty()); + } + + /** + * If we didn't send a repair to a replica because there wasn't a diff with the + * resolved column family, we shouldn't send it a speculative mutation + */ + @Test + public void mutationsArentSentToInSyncNodes() throws Exception + { + Mutation repair1 = mutation(cell2); + + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, repair1); + + // check that the correct initial mutations are sent out + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, targets, EndpointsForRange.of(target1, target2)); + handler.sendInitialRepairs(); + Assert.assertEquals(1, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint())); + + // check that speculative mutations aren't sent to target2 + handler.mutationsSent.clear(); + handler.maybeSendAdditionalWrites(0, TimeUnit.NANOSECONDS); + + Assert.assertEquals(1, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target3.endpoint())); + } + + @Test + public void onlyBlockOnQuorum() + { + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell1)); + repairs.put(target2, mutation(cell2)); + repairs.put(target3, mutation(cell3)); + Assert.assertEquals(3, repairs.size()); + + EndpointsForRange replicas = EndpointsForRange.of(target1, target2, target3); + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, replicas, replicas); + handler.sendInitialRepairs(); + + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + handler.ack(target1.endpoint()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + // here we should stop blocking, even though we've sent 3 repairs + handler.ack(target2.endpoint()); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + } + + /** + * For dc local consistency levels, noop mutations and responses from remote dcs should not affect effective blockFor + */ + @Test + public void remoteDCTest() throws Exception + { + Map<Replica, Mutation> repairs = new HashMap<>(); + repairs.put(target1, mutation(cell1)); + + Replica remote1 = full(InetAddressAndPort.getByName("10.0.0.1")); + Replica remote2 = full(InetAddressAndPort.getByName("10.0.0.2")); + repairs.put(remote1, mutation(cell1)); + + EndpointsForRange participants = EndpointsForRange.of(target1, target2, remote1, remote2); + EndpointsForRange targets = EndpointsForRange.of(target1, target2); + + InstrumentedReadRepairHandler handler = createRepairHandler(repairs, 2, participants, targets); + handler.sendInitialRepairs(); + Assert.assertEquals(2, handler.mutationsSent.size()); + Assert.assertTrue(handler.mutationsSent.containsKey(target1.endpoint())); + Assert.assertTrue(handler.mutationsSent.containsKey(remote1.endpoint())); + + Assert.assertEquals(1, handler.waitingOn()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + handler.ack(remote1.endpoint()); + Assert.assertEquals(1, handler.waitingOn()); + Assert.assertFalse(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + + handler.ack(target1.endpoint()); + Assert.assertEquals(0, handler.waitingOn()); + Assert.assertTrue(handler.awaitRepairs(0, TimeUnit.NANOSECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java index f97980b..2a2dec2 100644 --- a/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java +++ b/test/unit/org/apache/cassandra/service/reads/repair/TestableReadRepair.java @@ -29,17 +29,25 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.ReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.db.rows.UnfilteredRowIterator; +import org.apache.cassandra.db.rows.UnfilteredRowIterators; import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.service.reads.DigestResolver; -public class TestableReadRepair implements ReadRepair +public class TestableReadRepair<E extends Endpoints<E>, L extends ReplicaLayout<E, L>> implements ReadRepair<E, L> { public final Map<InetAddressAndPort, Mutation> sent = new HashMap<>(); private final ReadCommand command; private final ConsistencyLevel consistency; + private boolean partitionListenerClosed = false; + private boolean rowListenerClosed = true; + public TestableReadRepair(ReadCommand command, ConsistencyLevel consistency) { this.command = command; @@ -47,13 +55,35 @@ public class TestableReadRepair implements ReadRepair } @Override - public UnfilteredPartitionIterators.MergeListener getMergeListener(InetAddressAndPort[] endpoints) + public UnfilteredPartitionIterators.MergeListener getMergeListener(L endpoints) { - return new PartitionIteratorMergeListener(endpoints, command, consistency, this); + return new PartitionIteratorMergeListener(endpoints, command, consistency, this) { + @Override + public void close() + { + super.close(); + partitionListenerClosed = true; + } + + @Override + public UnfilteredRowIterators.MergeListener getRowMergeListener(DecoratedKey partitionKey, List<UnfilteredRowIterator> versions) + { + assert rowListenerClosed; + rowListenerClosed = false; + return new RowIteratorMergeListener(partitionKey, columns(versions), isReversed(versions), endpoints, command, consistency, TestableReadRepair.this) { + @Override + public void close() + { + super.close(); + rowListenerClosed = true; + } + }; + } + }; } @Override - public void startRepair(DigestResolver digestResolver, List<InetAddressAndPort> allEndpoints, List<InetAddressAndPort> contactedEndpoints, Consumer<PartitionIterator> resultConsumer) + public void startRepair(DigestResolver<E, L> digestResolver, Consumer<PartitionIterator> resultConsumer) { } @@ -83,13 +113,19 @@ public class TestableReadRepair implements ReadRepair } @Override - public void repairPartition(DecoratedKey key, Map<InetAddressAndPort, Mutation> mutations, InetAddressAndPort[] destinations) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, L replicaLayout) { - sent.putAll(mutations); + for (Map.Entry<Replica, Mutation> entry: mutations.entrySet()) + sent.put(entry.getKey().endpoint(), entry.getValue()); } public Mutation getForEndpoint(InetAddressAndPort endpoint) { return sent.get(endpoint); } -} + + public boolean dataWasConsumed() + { + return partitionListenerClosed && rowListenerClosed; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java index bc501be..909e221 100644 --- a/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java +++ b/test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import org.apache.cassandra.locator.RangesAtEndpoint; import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; @@ -144,7 +145,7 @@ public class StreamingTransferTest ranges.add(new Range<>(p.getToken(ByteBufferUtil.bytes("key2")), p.getMinimumToken())); StreamResultFuture futureResult = new StreamPlan(StreamOperation.OTHER) - .requestRanges(LOCAL, KEYSPACE2, ranges) + .requestRanges(LOCAL, KEYSPACE2, RangesAtEndpoint.toDummyList(ranges), RangesAtEndpoint.toDummyList(Collections.emptyList())) .execute(); UUID planId = futureResult.planId; @@ -238,13 +239,13 @@ public class StreamingTransferTest List<Range<Token>> ranges = new ArrayList<>(); // wrapped range ranges.add(new Range<Token>(p.getToken(ByteBufferUtil.bytes("key1")), p.getToken(ByteBufferUtil.bytes("key0")))); - StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName()); + StreamPlan streamPlan = new StreamPlan(StreamOperation.OTHER).transferRanges(LOCAL, cfs.keyspace.getName(), RangesAtEndpoint.toDummyList(ranges), cfs.getTableName()); streamPlan.execute().get(); //cannot add ranges after stream session is finished try { - streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), ranges, cfs.getTableName()); + streamPlan.transferRanges(LOCAL, cfs.keyspace.getName(), RangesAtEndpoint.toDummyList(ranges), cfs.getTableName()); fail("Should have thrown exception"); } catch (RuntimeException e) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java index 2842374..f8f6b12 100644 --- a/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java +++ b/test/unit/org/apache/cassandra/utils/concurrent/AccumulatorTest.java @@ -95,7 +95,7 @@ public class AccumulatorTest assertEquals("0", accu.get(3)); - Iterator<String> iter = accu.iterator(); + Iterator<String> iter = accu.snapshot().iterator(); assertEquals("3", iter.next()); assertEquals("2", iter.next()); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
