This is an automated email from the ASF dual-hosted git repository.

adelapena pushed a commit to branch cassandra-4.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-4.0 by this push:
     new 9a0af4112e Fix legacy clustering serialization for paging with compact 
storage
9a0af4112e is described below

commit 9a0af4112e87f5b97056aa39e63c5ab461b60237
Author: Andrés de la Peña <[email protected]>
AuthorDate: Mon Jan 9 13:50:47 2023 +0000

    Fix legacy clustering serialization for paging with compact storage
    
    patch by Andrés de la Peña; reviewed by Berenguer Blasi and Piotr 
Kołaczkowski for CASSANDRA-17507
---
 CHANGES.txt                                        |   1 +
 NEWS.txt                                           |  14 ++
 .../cassandra/service/pager/PagingState.java       |   8 +
 .../CompactStoragePagingWithProtocolTester.java    | 179 +++++++++++++++++++++
 .../CompactStoragePagingWithProtocolV30Test.java   |  33 ++++
 .../CompactStoragePagingWithProtocolV3XTest.java   |  33 ++++
 6 files changed, 268 insertions(+)

diff --git a/CHANGES.txt b/CHANGES.txt
index 8794ae533a..f032fc390e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0.8
+ * Fix legacy clustering serialization for paging with compact storage 
(CASSANDRA-17507)
  * Add support for python 3.11 (CASSANDRA-18088)
  * Fix formatting of duration in cqlsh (CASSANDRA-18141)
  * Fix sstable loading of keyspaces named snapshots or backups 
(CASSANDRA-14013)
diff --git a/NEWS.txt b/NEWS.txt
index 7d6bf91cf9..4915652351 100644
--- a/NEWS.txt
+++ b/NEWS.txt
@@ -51,6 +51,20 @@ restore snapshots created with the previous major version 
using the
 'sstableloader' tool. You can upgrade the file format of your snapshots
 using the provided 'sstableupgrade' tool.
 
+4.0.8
+=====
+
+New features
+------------
+
+Upgrading
+---------
+    - All previous versions of 4.x contained a mistake on the implementation 
of the old CQL native protocol v3. That
+    mistake produced issues when paging over tables with compact storage and a 
single clustering column during rolling
+    upgrades involving 3.x and 4.x nodes. The fix for that issue makes it can 
now appear during rolling upgrades from
+    4.0.0-4.0.7. If that is your case, please use protocol v4 or higher in 
your driver. See CASSANDRA-17507 for further
+    details.
+
 4.0.6
 =====
 
diff --git a/src/java/org/apache/cassandra/service/pager/PagingState.java 
b/src/java/org/apache/cassandra/service/pager/PagingState.java
index 2b160329de..2c2b08bf72 100644
--- a/src/java/org/apache/cassandra/service/pager/PagingState.java
+++ b/src/java/org/apache/cassandra/service/pager/PagingState.java
@@ -422,6 +422,10 @@ public class PagingState
         // Old (pre-3.0) encoding of cells. We need that for the protocol v3 
as that is how things where encoded
         private static ByteBuffer encodeCellName(TableMetadata metadata, 
Clustering<?> clustering, ByteBuffer columnName, ByteBuffer collectionElement)
         {
+            // v30 and v3X don't use composites for single-element clusterings 
in compact tables
+            if (metadata.isCompactTable() && metadata.comparator.size() == 1)
+                return clustering.bufferAt(0);
+
             boolean isStatic = clustering == Clustering.STATIC_CLUSTERING;
 
             // We use comparator.size() rather than clustering.size() because 
of static clusterings
@@ -458,6 +462,10 @@ public class PagingState
             if (csize == 0)
                 return Clustering.EMPTY;
 
+            // v30 and v3X don't use composites for single-element clusterings 
in compact tables
+            if (metadata.isCompactTable() && metadata.comparator.size() == 1)
+                return Clustering.make(value);
+
             if (CompositeType.isStaticName(value, ByteBufferAccessor.instance))
                 return Clustering.STATIC_CLUSTERING;
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolTester.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolTester.java
new file mode 100644
index 0000000000..2683e7d4d3
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolTester.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+import org.junit.Test;
+
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ProtocolVersion;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.SimpleStatement;
+import com.datastax.driver.core.Statement;
+import com.vdurmont.semver4j.Semver;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests paging over a table with {@code COMPACT STORAGE} in a mixed version 
cluster using different protocol versions.
+ */
+public abstract class CompactStoragePagingWithProtocolTester extends 
UpgradeTestBase
+{
+    /**
+     * The initial version from which we are upgrading.
+     */
+    protected abstract Semver initialVersion();
+
+    @Test
+    public void testPagingWithCompactStorageSingleClustering() throws Throwable
+    {
+        Object[] row1 = new Object[]{ "0", "01", "v" };
+        Object[] row2 = new Object[]{ "0", "02", "v" };
+        Object[] row3 = new Object[]{ "1", "01", "v" };
+        Object[] row4 = new Object[]{ "1", "02", "v" };
+
+        new TestCase()
+        .nodes(2)
+        .nodesToUpgrade(1)
+        .singleUpgrade(initialVersion(), CURRENT)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+        .setup(c -> {
+            c.schemaChange(withKeyspace("CREATE TABLE %s.t (pk text, ck text, 
v text, " +
+                                        "PRIMARY KEY (pk, ck)) WITH COMPACT 
STORAGE"));
+            String insert = withKeyspace("INSERT INTO %s.t (pk, ck, v) VALUES 
(?, ?, ?)");
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row1);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row2);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row3);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row4);
+        })
+        .runAfterNodeUpgrade((cluster, node) -> 
assertRowsWithAllProtocolVersions(row1, row2, row3, row4))
+        .run();
+    }
+
+    @Test
+    public void testPagingWithCompactStorageMultipleClusterings() throws 
Throwable
+    {
+        Object[] row1 = new Object[]{ "0", "01", "10", "v" };
+        Object[] row2 = new Object[]{ "0", "01", "20", "v" };
+        Object[] row3 = new Object[]{ "0", "02", "10", "v" };
+        Object[] row4 = new Object[]{ "0", "02", "20", "v" };
+        Object[] row5 = new Object[]{ "1", "01", "10", "v" };
+
+        new TestCase()
+        .nodes(2)
+        .nodesToUpgrade(1)
+        .singleUpgrade(initialVersion(), CURRENT)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+        .setup(c -> {
+            c.schemaChange(withKeyspace("CREATE TABLE %s.t (pk text, ck1 text, 
ck2 text, v text, " +
+                                        "PRIMARY KEY (pk, ck1, ck2)) WITH 
COMPACT STORAGE"));
+            String insert = withKeyspace("INSERT INTO %s.t (pk, ck1, ck2, v) 
VALUES (?, ?, ?, ?)");
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row1);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row2);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row3);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row4);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row5);
+        })
+        .runAfterNodeUpgrade((cluster, node) -> 
assertRowsWithAllProtocolVersions(row1, row2, row3, row4, row5))
+        .run();
+    }
+
+    @Test
+    public void testPagingWithCompactStorageWithoutClustering() throws 
Throwable
+    {
+        Object[] row1 = new Object[]{ "1", "v1", "v2" };
+        Object[] row2 = new Object[]{ "2", "v1", "v2" };
+        Object[] row3 = new Object[]{ "3", "v1", "v2" };
+
+        new TestCase()
+        .nodes(2)
+        .nodesToUpgrade(1)
+        .singleUpgrade(initialVersion(), CURRENT)
+        .withConfig(config -> config.with(GOSSIP, NETWORK, NATIVE_PROTOCOL))
+        .setup(c -> {
+            c.schemaChange(withKeyspace("CREATE TABLE %s.t (pk text PRIMARY 
KEY, v1 text, v2 text) WITH COMPACT STORAGE"));
+            String insert = withKeyspace("INSERT INTO %s.t (pk, v1, v2) VALUES 
(?, ?, ?)");
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row1);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row2);
+            c.coordinator(1).execute(insert, ConsistencyLevel.ALL, row3);
+        })
+        .runAfterNodeUpgrade((cluster, node) -> 
assertRowsWithAllProtocolVersions(row3, row2, row1))
+        .run();
+    }
+
+    private void assertRowsWithAllProtocolVersions(Object[]... rows)
+    {
+        String query = withKeyspace("SELECT * FROM %s.t");
+        assertRows(query, ProtocolVersion.V3, rows);
+        assertRows(query, ProtocolVersion.V4, rows);
+        if (initialVersion().isGreaterThanOrEqualTo(v3X))
+            assertRows(query, ProtocolVersion.V5, rows);
+    }
+
+    private static void assertRows(String query, ProtocolVersion 
protocolVersion, Object[]... expectedRows)
+    {
+        Cluster.Builder builder = com.datastax.driver.core.Cluster.builder()
+                                                                  
.addContactPoint("127.0.0.1")
+                                                                  
.withProtocolVersion(protocolVersion);
+        try (com.datastax.driver.core.Cluster cluster = builder.build();
+             Session session = cluster.connect())
+        {
+            Statement stmt = new SimpleStatement(query);
+            
stmt.setConsistencyLevel(com.datastax.driver.core.ConsistencyLevel.ALL);
+            stmt.setFetchSize(1);
+
+            ResultSet result = session.execute(stmt);
+            List<Row> actualRows = result.all();
+            assertEquals(expectedRows.length, actualRows.size());
+
+            ColumnDefinitions columnDefs = result.getColumnDefinitions();
+            com.datastax.driver.core.ProtocolVersion driverProtocolVersion =
+            
com.datastax.driver.core.ProtocolVersion.fromInt(protocolVersion.toInt());
+
+            for (int rowIndex = 0; rowIndex < expectedRows.length; rowIndex++)
+            {
+                Object[] expectedRow = expectedRows[rowIndex];
+                Row actualRow = actualRows.get(rowIndex);
+
+                assertEquals(expectedRow.length, 
actualRow.getColumnDefinitions().size());
+
+                for (int columnIndex = 0; columnIndex < columnDefs.size(); 
columnIndex++)
+                {
+                    DataType type = columnDefs.getType(columnIndex);
+                    ByteBuffer expectedByteValue = cluster.getConfiguration()
+                                                          .getCodecRegistry()
+                                                          .codecFor(type)
+                                                          
.serialize(expectedRow[columnIndex], driverProtocolVersion);
+                    ByteBuffer actualValue = 
actualRow.getBytesUnsafe(columnDefs.getName(columnIndex));
+                    assertEquals(expectedByteValue, actualValue);
+                }
+            }
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV30Test.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV30Test.java
new file mode 100644
index 0000000000..518a514146
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV30Test.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import com.vdurmont.semver4j.Semver;
+
+/**
+ * {@link CompactStoragePagingWithProtocolTester} for v30 -> CURRENT upgrade 
path.
+ */
+public class CompactStoragePagingWithProtocolV30Test extends 
CompactStoragePagingWithProtocolTester
+{
+    @Override
+    protected Semver initialVersion()
+    {
+        return v30;
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV3XTest.java
 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV3XTest.java
new file mode 100644
index 0000000000..003c372ceb
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/upgrade/CompactStoragePagingWithProtocolV3XTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import com.vdurmont.semver4j.Semver;
+
+/**
+ * {@link CompactStoragePagingWithProtocolTester} for v3X -> CURRENT upgrade 
path.
+ */
+public class CompactStoragePagingWithProtocolV3XTest extends 
CompactStoragePagingWithProtocolTester
+{
+    @Override
+    protected Semver initialVersion()
+    {
+        return v3X;
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to