This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 58f0e2146b3d7f0bd09c4dbd9d1da5e43f37ed73 Merge: cd347bdc58 f7984627e7 Author: Caleb Rackliffe <[email protected]> AuthorDate: Thu Feb 29 15:04:19 2024 -0600 Merge branch 'cassandra-5.0' into trunk * cassandra-5.0: Avoid possible consistency violations for SAI intersection queries over partially updated rows at consistency levels that require reconciliation CHANGES.txt | 1 + .../cql3/restrictions/StatementRestrictions.java | 6 +- .../cassandra/cql3/statements/SelectStatement.java | 9 +- .../cassandra/db/AbstractReadCommandBuilder.java | 2 +- .../org/apache/cassandra/db/ConsistencyLevel.java | 15 +- src/java/org/apache/cassandra/db/ReadCommand.java | 34 +- .../db/compaction/CompactionIterator.java | 29 +- .../org/apache/cassandra/db/filter/RowFilter.java | 243 +++++----- .../cassandra/db/rows/UnfilteredRowIterators.java | 33 +- .../apache/cassandra/index/sai/QueryContext.java | 14 +- .../cassandra/index/sai/StorageAttachedIndex.java | 17 +- .../index/sai/disk/IndexSearchResultIterator.java | 14 +- .../index/sai/disk/PerColumnIndexWriter.java | 4 +- .../cassandra/index/sai/disk/RowMapping.java | 39 +- .../index/sai/disk/v1/MemtableIndexWriter.java | 38 +- .../index/sai/iterators/KeyRangeIterator.java | 2 +- .../index/sai/iterators/KeyRangeUnionIterator.java | 38 +- .../cassandra/index/sai/plan/FilterTree.java | 58 ++- .../apache/cassandra/index/sai/plan/Operation.java | 32 +- .../cassandra/index/sai/plan/QueryController.java | 95 +++- .../cassandra/index/sai/plan/QueryViewBuilder.java | 78 +--- .../sai/plan/StorageAttachedIndexQueryPlan.java | 30 +- .../sai/plan/StorageAttachedIndexSearcher.java | 68 +-- .../cassandra/service/reads/DataResolver.java | 55 ++- .../service/reads/ReplicaFilteringProtection.java | 86 ++-- .../reads/repair/RowIteratorMergeListener.java | 10 +- .../test/ReplicaFilteringProtectionTest.java | 78 ++-- .../test/sai/ConcurrencyFactorTest.java | 9 +- .../test/sai/PartialUpdateHandlingTest.java | 494 +++++++++++++++++++++ .../test/sai/ReplicaFilteringProtectionTest.java | 67 --- .../distributed/test/sai/StrictFilteringTest.java | 227 ++++++++++ test/unit/org/apache/cassandra/db/CleanupTest.java | 2 +- .../org/apache/cassandra/db/ReadCommandTest.java | 6 +- .../db/ReadCommandVerbHandlerOutOfRangeTest.java | 4 +- .../cql/intersection/RandomIntersectionTester.java | 6 +- .../cassandra/index/sai/plan/OperationTest.java | 86 ++-- .../apache/cassandra/index/sasi/SASIIndexTest.java | 4 +- 37 files changed, 1408 insertions(+), 625 deletions(-) diff --cc CHANGES.txt index 24da866a5f,9808945582..fbbc57216b --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,24 -1,5 +1,25 @@@ -5.0-beta2 +5.1 + * Refactor cqlshmain global constants (CASSANDRA-19201) + * Remove native_transport_port_ssl (CASSANDRA-19397) + * Make nodetool reconfigurecms sync by default and add --cancel to be able to cancel ongoing reconfigurations (CASSANDRA-19216) + * Expose auth mode in system_views.clients, nodetool clientstats, metrics (CASSANDRA-19366) + * Remove sealed_periods and last_sealed_period tables (CASSANDRA-19189) + * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271) + * Refactor structure of caching metrics and expose auth cache metrics via JMX (CASSANDRA-17062) + * Allow CQL client certificate authentication to work without sending an AUTHENTICATE request (CASSANDRA-18857) + * Extend nodetool tpstats and system_views.thread_pools with detailed pool parameters (CASSANDRA-19289) + * Remove dependency on Sigar in favor of OSHI (CASSANDRA-16565) + * Simplify the bind marker and Term logic (CASSANDRA-18813) + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688) + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104) + * Make nodetool tablestats use number of significant digits for time and average values consistently (CASSANDRA-19015) + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875) + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330) + * Add ELAPSED command to cqlsh (CASSANDRA-18861) + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781) + * Clean up obsolete functions and simplify cql_version handling in cqlsh (CASSANDRA-18787) +Merged from 5.0: + * Avoid consistency violations for SAI intersections over unrepaired data at consistency levels requiring reconciliation (CASSANDRA-19018) * Fix NullPointerException in ANN+WHERE when adding rows in another partition (CASSANDRA-19404) * Record latencies for SAI post-filtering reads against local storage (CASSANDRA-18940) * Fix VectorMemoryIndex#update logic to compare vectors. Fix Index view (CASSANDRA-19168) diff --cc src/java/org/apache/cassandra/db/ReadCommand.java index bb2b856d12,6d3447a366..c279f816fb --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@@ -1030,11 -1011,12 +1030,17 @@@ public abstract class ReadCommand exten @VisibleForTesting public static class Serializer implements IVersionedSerializer<ReadCommand> { + private static final NoSpamLogger noSpamLogger = NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS); + private static final NoSpamLogger.NoSpamLogStatement schemaMismatchStmt = + noSpamLogger.getStatement("Schema epoch mismatch during read command deserialization. " + + "TableId: {}, remote epoch: {}, local epoch: {}", 10L, TimeUnit.SECONDS); + + private static final int IS_DIGEST = 0x01; + private static final int IS_FOR_THRIFT = 0x02; + private static final int HAS_INDEX = 0x04; + private static final int ACCEPTS_TRANSIENT = 0x08; + private static final int NEEDS_RECONCILIATION = 0x10; + private final SchemaProvider schema; public Serializer() @@@ -1124,37 -1115,20 +1141,38 @@@ // better complain loudly than doing the wrong thing. if (isForThrift(flags)) throw new IllegalStateException("Received a command with the thrift flag set. " - + "This means thrift is in use in a mixed 3.0/3.X and 4.0+ cluster, " - + "which is unsupported. Make sure to stop using thrift before " - + "upgrading to 4.0"); + + "This means thrift is in use in a mixed 3.0/3.X and 4.0+ cluster, " + + "which is unsupported. Make sure to stop using thrift before " + + "upgrading to 4.0"); boolean hasIndex = hasIndex(flags); - int digestVersion = isDigest ? in.readUnsignedVInt32() : 0; + int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0; + boolean needsReconciliation = needsReconciliation(flags); + TableId tableId = TableId.deserialize(in); - TableMetadata metadata = schema.getExistingTableMetadata(TableId.deserialize(in)); + Epoch schemaVersion = Epoch.EMPTY; + if (version >= MessagingService.VERSION_51) + schemaVersion = Epoch.serializer.deserialize(in); + TableMetadata tableMetadata; + try + { + tableMetadata = schema.getExistingTableMetadata(tableId); + } + catch (UnknownTableException e) + { + ClusterMetadata metadata = ClusterMetadata.current(); + Epoch localCurrentEpoch = metadata.epoch; + if (schemaVersion != null && localCurrentEpoch.isAfter(schemaVersion)) + { + TCMMetrics.instance.coordinatorBehindSchema.mark(); + throw new CoordinatorBehindException(e.getMessage()); + } + throw e; + } long nowInSec = version >= MessagingService.VERSION_50 ? CassandraUInt.toLong(in.readInt()) : in.readInt(); - ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); - RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata, needsReconciliation); - DataLimits limits = DataLimits.serializer.deserialize(in, version, metadata); - + ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, tableMetadata); - RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, tableMetadata); ++ RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, tableMetadata, needsReconciliation); + DataLimits limits = DataLimits.serializer.deserialize(in, version, tableMetadata); Index.QueryPlan indexQueryPlan = null; if (hasIndex) { diff --cc test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java index 3fd3c98e3e,0000000000..0b167a88f1 mode 100644,000000..100644 --- a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java +++ b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java @@@ -1,272 -1,0 +1,272 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.google.common.util.concurrent.ListenableFuture; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; +import org.apache.cassandra.exceptions.InvalidRoutingException; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.net.Message; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.net.Verb; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeState; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.*; +import static org.apache.cassandra.net.Verb.READ_REQ; +import static org.junit.Assert.assertEquals; + +public class ReadCommandVerbHandlerOutOfRangeTest +{ + private static ReadCommandVerbHandler handler; + private static TableMetadata metadata_nonreplicated; + private ColumnFamilyStore cfs; + private long startingTotalMetricCount; + private long startingKeyspaceMetricCount; + + private static final String TEST_NAME = "read_command_vh_test_"; + private static final String KEYSPACE_NONREPLICATED = TEST_NAME + "cql_keyspace"; + private static final String TABLE = "table1"; + + @BeforeClass + public static void init() throws Throwable + { + ServerTestUtils.prepareServerNoRegister(); + SchemaLoader.schemaDefinition(TEST_NAME); + ServerTestUtils.markCMS(); + metadata_nonreplicated = Schema.instance.getTableMetadata(KEYSPACE_NONREPLICATED, TABLE); + StorageService.instance.unsafeSetInitialized(); + } + + @Before + public void setup() + { + ServerTestUtils.resetCMS(); + ClusterMetadataTestHelper.addEndpoint(broadcastAddress, bytesToken(100)); + ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0)); + + MessagingService.instance().inboundSink.clear(); + MessagingService.instance().outboundSink.clear(); + MessagingService.instance().outboundSink.add((message, to) -> false); + MessagingService.instance().inboundSink.add((message) -> false); + + cfs = Keyspace.open(KEYSPACE_NONREPLICATED).getColumnFamilyStore(TABLE); + startingKeyspaceMetricCount = keyspaceMetricValue(cfs); + startingTotalMetricCount = StorageMetrics.totalOpsForInvalidToken.getCount(); + handler = new ReadCommandVerbHandler(); + } + + private static DecoratedKey key(TableMetadata metadata, int key) + { + return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key)); + } + + @Test + public void acceptPartitionReadForNaturalEndpoint() throws Exception + { + verifyPartitionReadAccepted(); + } + + @Test + public void acceptPartitionReadForPendingEndpoint() throws Exception + { + // reset ClusterMetadata then join the remote node and partially + // join the localhost one to emulate pending ranges + ServerTestUtils.resetCMS(); + ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0)); + ClusterMetadataTestHelper.register(broadcastAddress); + ClusterMetadataTestHelper.joinPartially(broadcastAddress, bytesToken(100)); + assertEquals(NodeState.BOOTSTRAPPING, ClusterMetadata.current().directory.peerState(broadcastAddress)); + + verifyPartitionReadAccepted(); + } + + private void verifyPartitionReadAccepted() throws Exception + { + ListenableFuture<MessageDelivery> messageSink = registerOutgoingMessageSink(); + int messageId = randomInt(); + int key = 50; + ReadCommand command = partitionRead(key); + handler.doVerb(Message.builder(READ_REQ, command).from(node1).withId(messageId).build()); + getAndVerifyResponse(messageSink, messageId, false); + } + + @Test(expected = InvalidRoutingException.class) + public void rejectPartitionReadForTokenOutOfRange() + { + // reject a read for a key who's token the node doesn't own the range for + int messageId = randomInt(); + int key = 200; + ReadCommand command = partitionRead(key); + handler.doVerb(Message.builder(READ_REQ, command).from(node1).withId(messageId).build()); + // we automatically send a failure response if doVerb above throws + } + + @Test + public void acceptRangeReadForNaturalEndpoint() throws Exception + { + // reset ClusterMetadata then join the remote node and partially + // join the localhost one to emulate pending ranges + ServerTestUtils.resetCMS(); + ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0)); + ClusterMetadataTestHelper.register(broadcastAddress); + ClusterMetadataTestHelper.joinPartially(broadcastAddress, bytesToken(100)); + assertEquals(NodeState.BOOTSTRAPPING, ClusterMetadata.current().directory.peerState(broadcastAddress)); + + verifyRangeReadAccepted(); + } + + @Test + public void acceptRangeReadForPendingEndpoint() throws Exception + { + verifyRangeReadAccepted(); + } + + private void verifyRangeReadAccepted() throws Exception + { + ListenableFuture<MessageDelivery> messageSink = registerOutgoingMessageSink(); + int messageId = randomInt(); + ReadCommand command = rangeRead(50, 60); + handler.doVerb(Message.builder(READ_REQ, command).from(node1).withId(messageId).build()); + getAndVerifyResponse(messageSink, messageId, false); + } + + @Test(expected = InvalidRoutingException.class) + public void rejectRangeReadForUnownedRange() throws Exception + { + int messageId = randomInt(); + ReadCommand command = rangeRead(150, 160); + handler.doVerb(Message.builder(READ_REQ, command).from(node1).withId(messageId).build()); + } + + private void getAndVerifyResponse(ListenableFuture<MessageDelivery> messageSink, + int messageId, + boolean isOutOfRange) throws InterruptedException, ExecutionException, TimeoutException + { + MessageDelivery response = messageSink.get(100, TimeUnit.MILLISECONDS); + assertEquals(isOutOfRange ? Verb.FAILURE_RSP : Verb.READ_RSP, response.message.verb()); + assertEquals(broadcastAddress, response.message.from()); + assertEquals(isOutOfRange, response.message.payload instanceof RequestFailureReason); + assertEquals(messageId, response.message.id()); + assertEquals(node1, response.to); + assertEquals(startingTotalMetricCount + (isOutOfRange ? 1 : 0), StorageMetrics.totalOpsForInvalidToken.getCount()); + assertEquals(startingKeyspaceMetricCount + (isOutOfRange ? 1 : 0), keyspaceMetricValue(cfs)); + } + + private ReadCommand partitionRead(int key) + { + return new StubReadCommand(key, metadata_nonreplicated); + } + + private ReadCommand rangeRead(int start, int end) + { + Range<Token> range = new Range<>(key(metadata_nonreplicated, start).getToken(), + key(metadata_nonreplicated, end).getToken()); + return new StubRangeReadCommand(range, metadata_nonreplicated); + } + + private static class StubReadCommand extends SinglePartitionReadCommand + { + private final TableMetadata tmd; + + StubReadCommand(int key, TableMetadata tmd) + { + super(tmd.epoch, + false, + 0, + false, + tmd, + FBUtilities.nowInSeconds(), + ColumnFilter.all(tmd), - RowFilter.NONE, ++ RowFilter.none(), + DataLimits.NONE, + key(tmd, key), + null, + null, + false); + + this.tmd = tmd; + } + + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return EmptyIterators.unfilteredPartition(tmd); + } + + public String toString() + { + return "<<StubReadCommand>>"; + } + } + + private static class StubRangeReadCommand extends PartitionRangeReadCommand + { + private final TableMetadata cfm; + + StubRangeReadCommand(Range<Token> range, TableMetadata tmd) + { + super(tmd.epoch, + false, + 0, + false, + tmd, + FBUtilities.nowInSeconds(), + ColumnFilter.all(tmd), - RowFilter.NONE, ++ RowFilter.none(), + DataLimits.NONE, + DataRange.forTokenRange(range), + null, + false); + + this.cfm = tmd; + } + + public UnfilteredPartitionIterator executeLocally(ReadExecutionController executionController) + { + return EmptyIterators.unfilteredPartition(cfm); + } + } + + private static long keyspaceMetricValue(ColumnFamilyStore cfs) + { + return cfs.keyspace.metric.outOfRangeTokenReads.getCount(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
