Repository: cassandra Updated Branches: refs/heads/trunk 8a73427c6 -> 2046c30ad
Fail query on transient replica if coordinator only expects full data Patch by Alex Petrov; reviewed by Benedict Elliott Smith for CASSANDRA-14704 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/2046c30a Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/2046c30a Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/2046c30a Branch: refs/heads/trunk Commit: 2046c30adec194fb07bc5dd1c31fc19a64e7895c Parents: 8a73427 Author: Alex Petrov <[email protected]> Authored: Fri Sep 7 11:02:55 2018 +0200 Committer: Alex Petrov <[email protected]> Committed: Wed Sep 12 17:02:02 2018 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/ReadCommandVerbHandler.java | 42 +++++++++++ .../locator/AbstractReplicationStrategy.java | 7 ++ .../apache/cassandra/service/StorageProxy.java | 7 +- .../unit/org/apache/cassandra/SchemaLoader.java | 20 +++-- .../org/apache/cassandra/cql3/CQLTester.java | 1 + .../db/ReadCommandVerbHandlerTest.java | 77 +++++++++++++------- 7 files changed, 121 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 264c80f..a30bec7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Fail query on transient replica if coordinator only expects full data (CASSANDRA-14704) * Remove mentions of transient replication from repair path (CASSANDRA-14698) * Fix handleRepairStatusChangedNotification to remove first then add (CASSANDRA-14720) * Allow transient node to serve as a repair coordinator (CASSANDRA-14693) http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index 1b28c2c..0e97dd8 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -17,8 +17,14 @@ */ package org.apache.cassandra.db; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.io.IVersionedSerializer; +import org.apache.cassandra.locator.Replica; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -29,6 +35,8 @@ import org.apache.cassandra.tracing.Tracing; public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> { + private static final Logger logger = LoggerFactory.getLogger(ReadCommandVerbHandler.class); + protected IVersionedSerializer<ReadResponse> serializer() { return ReadResponse.serializer; @@ -42,6 +50,8 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> } ReadCommand command = message.payload; + validateTransientStatus(message); + command.setMonitoringTime(message.constructionTime, message.isCrossNode(), message.getTimeout(), message.getSlowQueryTimeout()); if (message.parameters.containsKey(ParameterType.TRACK_REPAIRED_DATA)) @@ -65,4 +75,36 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> MessageOut<ReadResponse> reply = new MessageOut<>(MessagingService.Verb.REQUEST_RESPONSE, response, serializer()); MessagingService.instance().sendReply(reply, id, message.from); } + + private void validateTransientStatus(MessageIn<ReadCommand> message) + { + ReadCommand command = message.payload; + Token token; + + if (command.isLimitedToOnePartition()) + token = ((SinglePartitionReadCommand) command).partitionKey().getToken(); + else + token = ((PartitionRangeReadCommand) command).dataRange().keyRange().right.getToken(); + + Replica replica = Keyspace.open(command.metadata().keyspace) + .getReplicationStrategy() + .getLocalReplicaFor(token); + + if (replica == null) + { + logger.warn("Received a read request from {} for a range that is not owned by the current replica {}.", + message.from, + command); + return; + } + + if (!command.acceptsTransient() && replica.isTransient()) + { + MessagingService.instance().incrementDroppedMessages(message, message.getLifetimeInMS()); + throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s", + replica.isTransient() ? "transient" : "full", + command.acceptsTransient() ? "transient" : "full", + this)); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 0ddc0a4..d168052 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -124,6 +124,13 @@ public abstract class AbstractReplicationStrategy return endpoints; } + public Replica getLocalReplicaFor(RingPosition searchPosition) + { + return getNaturalReplicas(searchPosition) + .byEndpoint() + .get(FBUtilities.getBroadcastAddressAndPort()); + } + /** * calculate the natural endpoints for the given token * http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 9d9c628..5eb43cf 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -1919,7 +1919,7 @@ public class StorageProxy implements StorageProxyMBean EndpointsForRange targetReplicas = consistency.filterForQuery(keyspace, liveReplicas); int minResponses = Math.min(targetReplicas.size(), blockFor); - // Endpoitns for range here as well + // Endpoints for range here as well return ReplicaLayout.forRangeRead(keyspace, consistency, range, liveReplicas, targetReplicas.subList(0, minResponses)); } @@ -2146,9 +2146,10 @@ public class StorageProxy implements StorageProxyMBean for (Replica replica : replicaLayout.selected()) { Tracing.trace("Enqueuing request to {}", replica); - MessageOut<ReadCommand> message = rangeCommand.createMessage(); + PartitionRangeReadCommand command = replica.isFull() ? rangeCommand : rangeCommand.copyAsTransientQuery(); + MessageOut<ReadCommand> message = command.createMessage(); if (command.isTrackingRepairedStatus() && replica.isFull()) - message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); + message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); MessagingService.instance().sendRRWithFailure(message, replica.endpoint(), handler); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/SchemaLoader.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/SchemaLoader.java b/test/unit/org/apache/cassandra/SchemaLoader.java index 62b9670..41f8095 100644 --- a/test/unit/org/apache/cassandra/SchemaLoader.java +++ b/test/unit/org/apache/cassandra/SchemaLoader.java @@ -102,6 +102,8 @@ public class SchemaLoader String ks_nocommit = testName + "NoCommitlogSpace"; String ks_prsi = testName + "PerRowSecondaryIndex"; String ks_cql = testName + "cql_keyspace"; + String ks_cql_replicated = testName + "cql_keyspace_replicated"; + String ks_with_transient = testName + "ks_with_transient"; AbstractType bytes = BytesType.instance; @@ -218,16 +220,16 @@ public class SchemaLoader schema.add(KeyspaceMetadata.create(ks_nocommit, KeyspaceParams.simpleTransient(1), Tables.of( standardCFMD(ks_nocommit, "Standard1").build()))); + String simpleTable = "CREATE TABLE table1 (" + + "k int PRIMARY KEY," + + "v1 text," + + "v2 int" + + ")"; // CQLKeyspace schema.add(KeyspaceMetadata.create(ks_cql, KeyspaceParams.simple(1), Tables.of( // Column Families - CreateTableStatement.parse("CREATE TABLE table1 (" - + "k int PRIMARY KEY," - + "v1 text," - + "v2 int" - + ")", ks_cql) - .build(), + CreateTableStatement.parse(simpleTable, ks_cql).build(), CreateTableStatement.parse("CREATE TABLE table2 (" + "k text," @@ -237,6 +239,12 @@ public class SchemaLoader .build() ))); + schema.add(KeyspaceMetadata.create(ks_cql_replicated, KeyspaceParams.simple(3), + Tables.of(CreateTableStatement.parse(simpleTable, ks_cql_replicated).build()))); + + schema.add(KeyspaceMetadata.create(ks_with_transient, KeyspaceParams.simple("3/1"), + Tables.of(CreateTableStatement.parse(simpleTable, ks_with_transient).build()))); + if (DatabaseDescriptor.getPartitioner() instanceof Murmur3Partitioner) { schema.add(KeyspaceMetadata.create("sasi", http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index adadb9c..e6b0e29 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -190,6 +190,7 @@ public abstract class CQLTester return; DatabaseDescriptor.daemonInitialization(); + DatabaseDescriptor.setTransientReplicationEnabledUnsafe(true); // Cleanup first try http://git-wip-us.apache.org/repos/asf/cassandra/blob/2046c30a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java index 0c43661..b7e053b 100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerTest.java @@ -29,11 +29,14 @@ import org.junit.BeforeClass; import org.junit.Test; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.filter.ClusteringIndexSliceFilter; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.net.IMessageSink; import org.apache.cassandra.net.MessageIn; import org.apache.cassandra.net.MessageOut; @@ -41,27 +44,41 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ParameterType; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.FBUtilities; +import static org.apache.cassandra.Util.token; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; public class ReadCommandVerbHandlerTest { + private final static Random random = new Random(); + + private static ReadCommandVerbHandler handler; + private static TableMetadata metadata; + private static TableMetadata metadata_with_transient; + private static DecoratedKey KEY; + private static final String TEST_NAME = "read_command_vh_test_"; - private static final String KEYSPACE = TEST_NAME + "cql_keyspace"; + private static final String KEYSPACE = TEST_NAME + "cql_keyspace_replicated"; + private static final String KEYSPACE_WITH_TRANSIENT = TEST_NAME + "ks_with_transient"; private static final String TABLE = "table1"; - private final Random random = new Random(); - private ReadCommandVerbHandler handler; - private TableMetadata metadata; - @BeforeClass - public static void init() + public static void init() throws Throwable { SchemaLoader.loadSchema(); SchemaLoader.schemaDefinition(TEST_NAME); + metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); + metadata_with_transient = Schema.instance.getTableMetadata(KEYSPACE_WITH_TRANSIENT, TABLE); + KEY = key(metadata, 1); + + TokenMetadata tmd = StorageService.instance.getTokenMetadata(); + tmd.updateNormalToken(KEY.getToken(), InetAddressAndPort.getByName("127.0.0.2")); + tmd.updateNormalToken(key(metadata, 2).getToken(), InetAddressAndPort.getByName("127.0.0.3")); + tmd.updateNormalToken(key(metadata, 3).getToken(), FBUtilities.getBroadcastAddressAndPort()); } @Before @@ -81,14 +98,13 @@ public class ReadCommandVerbHandlerTest } }); - metadata = Schema.instance.getTableMetadata(KEYSPACE, TABLE); handler = new ReadCommandVerbHandler(); } @Test public void setRepairedDataTrackingFlagIfHeaderPresent() { - ReadCommand command = command(key()); + SinglePartitionReadCommand command = command(metadata); assertFalse(command.isTrackingRepairedStatus()); Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); @@ -104,7 +120,7 @@ public class ReadCommandVerbHandlerTest @Test public void dontSetRepairedDataTrackingFlagUnlessHeaderPresent() { - ReadCommand command = command(key()); + SinglePartitionReadCommand command = command(metadata); assertFalse(command.isTrackingRepairedStatus()); Map<ParameterType, Object> params = ImmutableMap.of(ParameterType.TRACE_SESSION, UUID.randomUUID()); @@ -120,7 +136,7 @@ public class ReadCommandVerbHandlerTest @Test public void dontSetRepairedDataTrackingFlagIfHeadersEmpty() { - ReadCommand command = command(key()); + SinglePartitionReadCommand command = command(metadata); assertFalse(command.isTrackingRepairedStatus()); handler.doVerb(MessageIn.create(peer(), command, @@ -131,17 +147,24 @@ public class ReadCommandVerbHandlerTest assertFalse(command.isTrackingRepairedStatus()); } - private int key() + @Test (expected = InvalidRequestException.class) + public void rejectsRequestWithNonMatchingTransientness() { - return random.nextInt(); + SinglePartitionReadCommand command = command(metadata_with_transient); + handler.doVerb(MessageIn.create(peer(), + command, + ImmutableMap.of(), + MessagingService.Verb.READ, + MessagingService.current_version), + messageId()); } - private int messageId() + private static int messageId() { return random.nextInt(); } - private InetAddressAndPort peer() + private static InetAddressAndPort peer() { try { @@ -153,19 +176,23 @@ public class ReadCommandVerbHandlerTest } } - private ReadCommand command(int key) + private static SinglePartitionReadCommand command(TableMetadata metadata) { return new SinglePartitionReadCommand(false, - 0, - false, - metadata, - FBUtilities.nowInSeconds(), - ColumnFilter.all(metadata), - RowFilter.NONE, - DataLimits.NONE, - metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)), - new ClusteringIndexSliceFilter(Slices.ALL, false), - null); + 0, + false, + metadata, + FBUtilities.nowInSeconds(), + ColumnFilter.all(metadata), + RowFilter.NONE, + DataLimits.NONE, + KEY, + new ClusteringIndexSliceFilter(Slices.ALL, false), + null); } + private static DecoratedKey key(TableMetadata metadata, int key) + { + return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
