This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 58f0e2146b3d7f0bd09c4dbd9d1da5e43f37ed73
Merge: cd347bdc58 f7984627e7
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Thu Feb 29 15:04:19 2024 -0600

    Merge branch 'cassandra-5.0' into trunk
    
    * cassandra-5.0:
      Avoid possible consistency violations for SAI intersection queries over 
partially updated rows at consistency levels that require reconciliation

 CHANGES.txt                                        |   1 +
 .../cql3/restrictions/StatementRestrictions.java   |   6 +-
 .../cassandra/cql3/statements/SelectStatement.java |   9 +-
 .../cassandra/db/AbstractReadCommandBuilder.java   |   2 +-
 .../org/apache/cassandra/db/ConsistencyLevel.java  |  15 +-
 src/java/org/apache/cassandra/db/ReadCommand.java  |  34 +-
 .../db/compaction/CompactionIterator.java          |  29 +-
 .../org/apache/cassandra/db/filter/RowFilter.java  | 243 +++++-----
 .../cassandra/db/rows/UnfilteredRowIterators.java  |  33 +-
 .../apache/cassandra/index/sai/QueryContext.java   |  14 +-
 .../cassandra/index/sai/StorageAttachedIndex.java  |  17 +-
 .../index/sai/disk/IndexSearchResultIterator.java  |  14 +-
 .../index/sai/disk/PerColumnIndexWriter.java       |   4 +-
 .../cassandra/index/sai/disk/RowMapping.java       |  39 +-
 .../index/sai/disk/v1/MemtableIndexWriter.java     |  38 +-
 .../index/sai/iterators/KeyRangeIterator.java      |   2 +-
 .../index/sai/iterators/KeyRangeUnionIterator.java |  38 +-
 .../cassandra/index/sai/plan/FilterTree.java       |  58 ++-
 .../apache/cassandra/index/sai/plan/Operation.java |  32 +-
 .../cassandra/index/sai/plan/QueryController.java  |  95 +++-
 .../cassandra/index/sai/plan/QueryViewBuilder.java |  78 +---
 .../sai/plan/StorageAttachedIndexQueryPlan.java    |  30 +-
 .../sai/plan/StorageAttachedIndexSearcher.java     |  68 +--
 .../cassandra/service/reads/DataResolver.java      |  55 ++-
 .../service/reads/ReplicaFilteringProtection.java  |  86 ++--
 .../reads/repair/RowIteratorMergeListener.java     |  10 +-
 .../test/ReplicaFilteringProtectionTest.java       |  78 ++--
 .../test/sai/ConcurrencyFactorTest.java            |   9 +-
 .../test/sai/PartialUpdateHandlingTest.java        | 494 +++++++++++++++++++++
 .../test/sai/ReplicaFilteringProtectionTest.java   |  67 ---
 .../distributed/test/sai/StrictFilteringTest.java  | 227 ++++++++++
 test/unit/org/apache/cassandra/db/CleanupTest.java |   2 +-
 .../org/apache/cassandra/db/ReadCommandTest.java   |   6 +-
 .../db/ReadCommandVerbHandlerOutOfRangeTest.java   |   4 +-
 .../cql/intersection/RandomIntersectionTester.java |   6 +-
 .../cassandra/index/sai/plan/OperationTest.java    |  86 ++--
 .../apache/cassandra/index/sasi/SASIIndexTest.java |   4 +-
 37 files changed, 1408 insertions(+), 625 deletions(-)

diff --cc CHANGES.txt
index 24da866a5f,9808945582..fbbc57216b
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,24 -1,5 +1,25 @@@
 -5.0-beta2
 +5.1
 + * Refactor cqlshmain global constants (CASSANDRA-19201)
 + * Remove native_transport_port_ssl (CASSANDRA-19397)
 + * Make nodetool reconfigurecms sync by default and add --cancel to be able 
to cancel ongoing reconfigurations (CASSANDRA-19216)
 + * Expose auth mode in system_views.clients, nodetool clientstats, metrics 
(CASSANDRA-19366)
 + * Remove sealed_periods and last_sealed_period tables (CASSANDRA-19189)
 + * Improve setup and initialisation of LocalLog/LogSpec (CASSANDRA-19271)
 + * Refactor structure of caching metrics and expose auth cache metrics via 
JMX (CASSANDRA-17062)
 + * Allow CQL client certificate authentication to work without sending an 
AUTHENTICATE request (CASSANDRA-18857)
 + * Extend nodetool tpstats and system_views.thread_pools with detailed pool 
parameters (CASSANDRA-19289) 
 + * Remove dependency on Sigar in favor of OSHI (CASSANDRA-16565)
 + * Simplify the bind marker and Term logic (CASSANDRA-18813)
 + * Limit cassandra startup to supported JDKs, allow higher JDKs by setting 
CASSANDRA_JDK_UNSUPPORTED (CASSANDRA-18688)
 + * Standardize nodetool tablestats formatting of data units (CASSANDRA-19104)
 + * Make nodetool tablestats use number of significant digits for time and 
average values consistently (CASSANDRA-19015)
 + * Upgrade jackson to 2.15.3 and snakeyaml to 2.1 (CASSANDRA-18875)
 + * Transactional Cluster Metadata [CEP-21] (CASSANDRA-18330)
 + * Add ELAPSED command to cqlsh (CASSANDRA-18861)
 + * Add the ability to disable bulk loading of SSTables (CASSANDRA-18781)
 + * Clean up obsolete functions and simplify cql_version handling in cqlsh 
(CASSANDRA-18787)
 +Merged from 5.0:
+  * Avoid consistency violations for SAI intersections over unrepaired data at 
consistency levels requiring reconciliation (CASSANDRA-19018)
   * Fix NullPointerException in ANN+WHERE when adding rows in another 
partition (CASSANDRA-19404)
   * Record latencies for SAI post-filtering reads against local storage 
(CASSANDRA-18940)
   * Fix VectorMemoryIndex#update logic to compare vectors. Fix Index view 
(CASSANDRA-19168)
diff --cc src/java/org/apache/cassandra/db/ReadCommand.java
index bb2b856d12,6d3447a366..c279f816fb
--- a/src/java/org/apache/cassandra/db/ReadCommand.java
+++ b/src/java/org/apache/cassandra/db/ReadCommand.java
@@@ -1030,11 -1011,12 +1030,17 @@@ public abstract class ReadCommand exten
      @VisibleForTesting
      public static class Serializer implements 
IVersionedSerializer<ReadCommand>
      {
 +        private static final NoSpamLogger noSpamLogger = 
NoSpamLogger.getLogger(logger, 10L, TimeUnit.SECONDS);
 +        private static final NoSpamLogger.NoSpamLogStatement 
schemaMismatchStmt =
 +            noSpamLogger.getStatement("Schema epoch mismatch during read 
command deserialization. " +
 +                                      "TableId: {}, remote epoch: {}, local 
epoch: {}", 10L, TimeUnit.SECONDS);
 +
+         private static final int IS_DIGEST = 0x01;
+         private static final int IS_FOR_THRIFT = 0x02;
+         private static final int HAS_INDEX = 0x04;
+         private static final int ACCEPTS_TRANSIENT = 0x08;
+         private static final int NEEDS_RECONCILIATION = 0x10;
+ 
          private final SchemaProvider schema;
  
          public Serializer()
@@@ -1124,37 -1115,20 +1141,38 @@@
              // better complain loudly than doing the wrong thing.
              if (isForThrift(flags))
                  throw new IllegalStateException("Received a command with the 
thrift flag set. "
 -                                              + "This means thrift is in use 
in a mixed 3.0/3.X and 4.0+ cluster, "
 -                                              + "which is unsupported. Make 
sure to stop using thrift before "
 -                                              + "upgrading to 4.0");
 +                                                + "This means thrift is in 
use in a mixed 3.0/3.X and 4.0+ cluster, "
 +                                                + "which is unsupported. Make 
sure to stop using thrift before "
 +                                                + "upgrading to 4.0");
  
              boolean hasIndex = hasIndex(flags);
 -            int digestVersion = isDigest ? in.readUnsignedVInt32() : 0;
 +            int digestVersion = isDigest ? (int)in.readUnsignedVInt() : 0;
+             boolean needsReconciliation = needsReconciliation(flags);
 +            TableId tableId = TableId.deserialize(in);
  
 -            TableMetadata metadata = 
schema.getExistingTableMetadata(TableId.deserialize(in));
 +            Epoch schemaVersion = Epoch.EMPTY;
 +            if (version >= MessagingService.VERSION_51)
 +                schemaVersion = Epoch.serializer.deserialize(in);
 +            TableMetadata tableMetadata;
 +            try
 +            {
 +                tableMetadata = schema.getExistingTableMetadata(tableId);
 +            }
 +            catch (UnknownTableException e)
 +            {
 +                ClusterMetadata metadata = ClusterMetadata.current();
 +                Epoch localCurrentEpoch = metadata.epoch;
 +                if (schemaVersion != null && 
localCurrentEpoch.isAfter(schemaVersion))
 +                {
 +                    TCMMetrics.instance.coordinatorBehindSchema.mark();
 +                    throw new CoordinatorBehindException(e.getMessage());
 +                }
 +                throw e;
 +            }
              long nowInSec = version >= MessagingService.VERSION_50 ? 
CassandraUInt.toLong(in.readInt()) : in.readInt();
 -            ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, metadata);
 -            RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, metadata, needsReconciliation);
 -            DataLimits limits = DataLimits.serializer.deserialize(in, 
version,  metadata);
 -
 +            ColumnFilter columnFilter = 
ColumnFilter.serializer.deserialize(in, version, tableMetadata);
-             RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, tableMetadata);
++            RowFilter rowFilter = RowFilter.serializer.deserialize(in, 
version, tableMetadata, needsReconciliation);
 +            DataLimits limits = DataLimits.serializer.deserialize(in, 
version,  tableMetadata);
              Index.QueryPlan indexQueryPlan = null;
              if (hasIndex)
              {
diff --cc 
test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java
index 3fd3c98e3e,0000000000..0b167a88f1
mode 100644,000000..100644
--- 
a/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java
+++ 
b/test/unit/org/apache/cassandra/db/ReadCommandVerbHandlerOutOfRangeTest.java
@@@ -1,272 -1,0 +1,272 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.db;
 +
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
 +import com.google.common.util.concurrent.ListenableFuture;
 +import org.junit.Before;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.ServerTestUtils;
 +import org.apache.cassandra.db.filter.ColumnFilter;
 +import org.apache.cassandra.db.filter.DataLimits;
 +import org.apache.cassandra.db.filter.RowFilter;
 +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
 +import org.apache.cassandra.dht.Range;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
 +import org.apache.cassandra.exceptions.InvalidRoutingException;
 +import org.apache.cassandra.exceptions.RequestFailureReason;
 +import org.apache.cassandra.metrics.StorageMetrics;
 +import org.apache.cassandra.net.Message;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.net.Verb;
 +import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.membership.NodeState;
 +import org.apache.cassandra.utils.ByteBufferUtil;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static 
org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper.*;
 +import static org.apache.cassandra.net.Verb.READ_REQ;
 +import static org.junit.Assert.assertEquals;
 +
 +public class ReadCommandVerbHandlerOutOfRangeTest
 +{
 +    private static ReadCommandVerbHandler handler;
 +    private static TableMetadata metadata_nonreplicated;
 +    private ColumnFamilyStore cfs;
 +    private long startingTotalMetricCount;
 +    private long startingKeyspaceMetricCount;
 +
 +    private static final String TEST_NAME = "read_command_vh_test_";
 +    private static final String KEYSPACE_NONREPLICATED = TEST_NAME + 
"cql_keyspace";
 +    private static final String TABLE = "table1";
 +
 +    @BeforeClass
 +    public static void init() throws Throwable
 +    {
 +        ServerTestUtils.prepareServerNoRegister();
 +        SchemaLoader.schemaDefinition(TEST_NAME);
 +        ServerTestUtils.markCMS();
 +        metadata_nonreplicated = 
Schema.instance.getTableMetadata(KEYSPACE_NONREPLICATED, TABLE);
 +        StorageService.instance.unsafeSetInitialized();
 +    }
 +
 +    @Before
 +    public void setup()
 +    {
 +        ServerTestUtils.resetCMS();
 +        ClusterMetadataTestHelper.addEndpoint(broadcastAddress, 
bytesToken(100));
 +        ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0));
 +
 +        MessagingService.instance().inboundSink.clear();
 +        MessagingService.instance().outboundSink.clear();
 +        MessagingService.instance().outboundSink.add((message, to) -> false);
 +        MessagingService.instance().inboundSink.add((message) -> false);
 +
 +        cfs = 
Keyspace.open(KEYSPACE_NONREPLICATED).getColumnFamilyStore(TABLE);
 +        startingKeyspaceMetricCount = keyspaceMetricValue(cfs);
 +        startingTotalMetricCount = 
StorageMetrics.totalOpsForInvalidToken.getCount();
 +        handler = new ReadCommandVerbHandler();
 +    }
 +
 +    private static DecoratedKey key(TableMetadata metadata, int key)
 +    {
 +        return metadata.partitioner.decorateKey(ByteBufferUtil.bytes(key));
 +    }
 +
 +    @Test
 +    public void acceptPartitionReadForNaturalEndpoint() throws Exception
 +    {
 +        verifyPartitionReadAccepted();
 +    }
 +
 +    @Test
 +    public void acceptPartitionReadForPendingEndpoint() throws Exception
 +    {
 +        // reset ClusterMetadata then join the remote node and partially
 +        // join the localhost one to emulate pending ranges
 +        ServerTestUtils.resetCMS();
 +        ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0));
 +        ClusterMetadataTestHelper.register(broadcastAddress);
 +        ClusterMetadataTestHelper.joinPartially(broadcastAddress, 
bytesToken(100));
 +        assertEquals(NodeState.BOOTSTRAPPING, 
ClusterMetadata.current().directory.peerState(broadcastAddress));
 +
 +        verifyPartitionReadAccepted();
 +    }
 +
 +    private void verifyPartitionReadAccepted() throws Exception
 +    {
 +        ListenableFuture<MessageDelivery> messageSink = 
registerOutgoingMessageSink();
 +        int messageId = randomInt();
 +        int key = 50;
 +        ReadCommand command = partitionRead(key);
 +        handler.doVerb(Message.builder(READ_REQ, 
command).from(node1).withId(messageId).build());
 +        getAndVerifyResponse(messageSink, messageId, false);
 +    }
 +
 +    @Test(expected = InvalidRoutingException.class)
 +    public void rejectPartitionReadForTokenOutOfRange() 
 +    {
 +        // reject a read for a key who's token the node doesn't own the range 
for
 +        int messageId = randomInt();
 +        int key = 200;
 +        ReadCommand command = partitionRead(key);
 +        handler.doVerb(Message.builder(READ_REQ, 
command).from(node1).withId(messageId).build());
 +        // we automatically send a failure response if doVerb above throws
 +    }
 +
 +    @Test
 +    public void acceptRangeReadForNaturalEndpoint() throws Exception
 +    {
 +        // reset ClusterMetadata then join the remote node and partially
 +        // join the localhost one to emulate pending ranges
 +        ServerTestUtils.resetCMS();
 +        ClusterMetadataTestHelper.addEndpoint(node1, bytesToken(0));
 +        ClusterMetadataTestHelper.register(broadcastAddress);
 +        ClusterMetadataTestHelper.joinPartially(broadcastAddress, 
bytesToken(100));
 +        assertEquals(NodeState.BOOTSTRAPPING, 
ClusterMetadata.current().directory.peerState(broadcastAddress));
 +
 +        verifyRangeReadAccepted();
 +    }
 +
 +    @Test
 +    public void acceptRangeReadForPendingEndpoint() throws Exception
 +    {
 +        verifyRangeReadAccepted();
 +    }
 +
 +    private void verifyRangeReadAccepted() throws Exception
 +    {
 +        ListenableFuture<MessageDelivery> messageSink = 
registerOutgoingMessageSink();
 +        int messageId = randomInt();
 +        ReadCommand command = rangeRead(50, 60);
 +        handler.doVerb(Message.builder(READ_REQ, 
command).from(node1).withId(messageId).build());
 +        getAndVerifyResponse(messageSink, messageId, false);
 +    }
 +
 +    @Test(expected = InvalidRoutingException.class)
 +    public void rejectRangeReadForUnownedRange() throws Exception
 +    {
 +        int messageId = randomInt();
 +        ReadCommand command = rangeRead(150, 160);
 +        handler.doVerb(Message.builder(READ_REQ, 
command).from(node1).withId(messageId).build());
 +    }
 +
 +    private void getAndVerifyResponse(ListenableFuture<MessageDelivery> 
messageSink,
 +                                      int messageId,
 +                                      boolean isOutOfRange) throws 
InterruptedException, ExecutionException, TimeoutException
 +    {
 +        MessageDelivery response = messageSink.get(100, 
TimeUnit.MILLISECONDS);
 +        assertEquals(isOutOfRange ? Verb.FAILURE_RSP : Verb.READ_RSP, 
response.message.verb());
 +        assertEquals(broadcastAddress, response.message.from());
 +        assertEquals(isOutOfRange, response.message.payload instanceof 
RequestFailureReason);
 +        assertEquals(messageId, response.message.id());
 +        assertEquals(node1, response.to);
 +        assertEquals(startingTotalMetricCount + (isOutOfRange ? 1 : 0), 
StorageMetrics.totalOpsForInvalidToken.getCount());
 +        assertEquals(startingKeyspaceMetricCount + (isOutOfRange ? 1 : 0), 
keyspaceMetricValue(cfs));
 +    }
 +
 +    private ReadCommand partitionRead(int key)
 +    {
 +        return new StubReadCommand(key, metadata_nonreplicated);
 +    }
 +
 +    private ReadCommand rangeRead(int start, int end)
 +    {
 +        Range<Token> range = new Range<>(key(metadata_nonreplicated, 
start).getToken(),
 +                                         key(metadata_nonreplicated, 
end).getToken());
 +        return new StubRangeReadCommand(range, metadata_nonreplicated);
 +    }
 +
 +    private static class StubReadCommand extends SinglePartitionReadCommand
 +    {
 +        private final TableMetadata tmd;
 +
 +        StubReadCommand(int key, TableMetadata tmd)
 +        {
 +            super(tmd.epoch,
 +                  false,
 +                  0,
 +                  false,
 +                  tmd,
 +                  FBUtilities.nowInSeconds(),
 +                  ColumnFilter.all(tmd),
-                   RowFilter.NONE,
++                  RowFilter.none(),
 +                  DataLimits.NONE,
 +                  key(tmd, key),
 +                  null,
 +                  null,
 +                  false);
 +
 +            this.tmd = tmd;
 +        }
 +
 +        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
 +        {
 +            return EmptyIterators.unfilteredPartition(tmd);
 +        }
 +
 +        public String toString()
 +        {
 +            return "<<StubReadCommand>>";
 +        }
 +    }
 +
 +    private static class StubRangeReadCommand extends 
PartitionRangeReadCommand
 +    {
 +        private final TableMetadata cfm;
 +
 +        StubRangeReadCommand(Range<Token> range, TableMetadata tmd)
 +        {
 +            super(tmd.epoch,
 +                  false,
 +                  0,
 +                  false,
 +                  tmd,
 +                  FBUtilities.nowInSeconds(),
 +                  ColumnFilter.all(tmd),
-                   RowFilter.NONE,
++                  RowFilter.none(),
 +                  DataLimits.NONE,
 +                  DataRange.forTokenRange(range),
 +                  null,
 +                  false);
 +
 +            this.cfm = tmd;
 +        }
 +
 +        public UnfilteredPartitionIterator 
executeLocally(ReadExecutionController executionController)
 +        {
 +            return EmptyIterators.unfilteredPartition(cfm);
 +        }
 +    }
 +
 +    private static long keyspaceMetricValue(ColumnFamilyStore cfs)
 +    {
 +        return cfs.keyspace.metric.outOfRangeTokenReads.getCount();
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to