This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.11
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 6301bea273fb9cd1afdca3842ec68f19f13b17b6
Merge: 91c12bd 1d87da3
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jun 21 15:46:51 2021 +0200

    Merge branch 'cassandra-3.0' into cassandra-3.11

 CHANGES.txt                                        |   1 +
 .../apache/cassandra/config/ColumnDefinition.java  |  23 -----
 src/java/org/apache/cassandra/db/Columns.java      |  18 +---
 .../apache/cassandra/db/SerializationHeader.java   |  17 +---
 .../cassandra/db/UnknownColumnException.java       |  12 +--
 .../apache/cassandra/db/filter/ColumnFilter.java   |   8 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   7 --
 .../cassandra/db/rows/UnfilteredSerializer.java    |  16 +--
 .../cassandra/distributed/test/SchemaTest.java     | 113 ++++++++-------------
 .../distributed/test/SimpleReadWriteTest.java      |  50 ---------
 test/unit/org/apache/cassandra/db/ColumnsTest.java |   2 +-
 11 files changed, 58 insertions(+), 209 deletions(-)

diff --cc CHANGES.txt
index 34a5973,91d85be..83fc711
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,13 -1,5 +1,14 @@@
 -3.0.25:
 +3.11.11
 + * Fix LeveledCompactionStrategy compacts last level throw an 
ArrayIndexOutOfBoundsException (CASSANDRA-15669)
 + * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing 
nodetool (CASSANDRA-16199)
 + * Nodetool garbagecollect should retain SSTableLevel for LCS 
(CASSANDRA-16634)
 + * Ignore stale acks received in the shadow round (CASSANDRA-16588)
 + * Add autocomplete and error messages for provide_overlapping_tombstones 
(CASSANDRA-16350)
 + * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) 
(CASSANDRA-16447)
 + * Make sure sstables with moved starts are removed correctly in 
LeveledGenerations (CASSANDRA-16552)
 + * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462)
 +Merged from 3.0:
+  * Adding columns via ALTER TABLE can generate corrupt sstables 
(CASSANDRA-16735)
   * Add flag to disable ALTER...DROP COMPACT STORAGE statements 
(CASSANDRA-16733)
   * Clean transaction log leftovers at the beginning of sstablelevelreset and 
sstableofflinerelevel (CASSANDRA-12519)
   * CQL shell should prefer newer TLS version by default (CASSANDRA-16695)
diff --cc src/java/org/apache/cassandra/db/Columns.java
index 512b695,18e17d7..8efb848
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@@ -460,13 -441,8 +460,9 @@@ public class Columns extends AbstractCo
                      // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
                      // deserialization. The column will be ignore later on 
anyway.
                      column = metadata.getDroppedColumnDefinition(name);
 +
-                     // If there's no dropped column, it may be for a column 
we haven't received a schema update for yet
-                     // so we create a placeholder column. If this is a read, 
the placeholder column will let the response
-                     // serializer know we're not serializing all requested 
columns when it writes the row flags, but it
-                     // will cause mutations that try to write values for this 
column to fail.
                      if (column == null)
-                         column = ColumnDefinition.placeholder(metadata, name, 
isStatic);
+                         throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
                  }
                  builder.add(column);
              }
diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index dfbce4a,c658c12..d7b4c81
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@@ -509,11 -406,11 +509,11 @@@ public class ColumnFilte
                  }
              }
  
 -            if (hasSelection)
 +            if (hasQueried)
              {
-                 Columns statics = Columns.serializer.deserializeStatics(in, 
metadata);
-                 Columns regulars = Columns.serializer.deserializeRegulars(in, 
metadata);
+                 Columns statics = Columns.serializer.deserialize(in, 
metadata);
+                 Columns regulars = Columns.serializer.deserialize(in, 
metadata);
 -                selection = new PartitionColumns(statics, regulars);
 +                queried = new PartitionColumns(statics, regulars);
              }
  
              SortedSetMultimap<ColumnIdentifier, ColumnSubselection> 
subSelections = null;
diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 0890611,0342e39..c81ac9d
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@@ -19,18 -19,13 +19,17 @@@ package org.apache.cassandra.db.rows
  
  import java.io.IOException;
  
 -import com.google.common.collect.Collections2;
  
 +import net.nicoulaj.compilecommand.annotations.Inline;
  import org.apache.cassandra.config.ColumnDefinition;
- import org.apache.cassandra.db.marshal.UTF8Type;
  import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.rows.Row.Deletion;
  import org.apache.cassandra.io.util.DataInputPlus;
 +import org.apache.cassandra.io.util.DataOutputBuffer;
  import org.apache.cassandra.io.util.DataOutputPlus;
 +import org.apache.cassandra.io.util.FileDataInput;
  import org.apache.cassandra.utils.SearchIterator;
 +import org.apache.cassandra.utils.WrappedException;
  
  /**
   * Serialize/deserialize a single Unfiltered (both on-wire and on-disk).
@@@ -156,7 -133,7 +155,7 @@@ public class UnfilteredSerialize
          LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
          Row.Deletion deletion = row.deletion();
          boolean hasComplexDeletion = row.hasComplexDeletion();
-         boolean hasAllColumns = header.hasAllColumns(row, isStatic);
 -        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
++        boolean hasAllColumns = row.columnCount() == headerColumns.size();
          boolean hasExtendedFlags = hasExtendedFlags(row);
  
          if (isStatic)
@@@ -230,42 -184,20 +229,37 @@@
              Columns.serializer.serializeSubset(row.columns(), headerColumns, 
out);
  
          SearchIterator<ColumnDefinition, ColumnDefinition> si = 
headerColumns.iterator();
 -        for (ColumnData data : row)
 +
 +        try
          {
 -            // We can obtain the column for data directly from data.column(). 
However, if the cell/complex data
 -            // originates from a sstable, the column we'll get will have the 
type used when the sstable was serialized,
 -            // and if that type have been recently altered, that may not be 
the type we want to serialize the column
 -            // with. So we use the ColumnDefinition from the "header" which 
is "current". Also see #11810 for what
 -            // happens if we don't do that.
 -            ColumnDefinition column = si.next(data.column());
 -            assert column != null;
 +            row.apply(cd -> {
 +                // We can obtain the column for data directly from 
data.column(). However, if the cell/complex data
 +                // originates from a sstable, the column we'll get will have 
the type used when the sstable was serialized,
 +                // and if that type have been recently altered, that may not 
be the type we want to serialize the column
 +                // with. So we use the ColumnDefinition from the "header" 
which is "current". Also see #11810 for what
 +                // happens if we don't do that.
 +                ColumnDefinition column = si.next(cd.column());
- 
-                 // we may have columns that the remote node isn't aware of 
due to inflight schema changes
-                 // in cases where it tries to fetch all columns, it will set 
the `all columns` flag, but only
-                 // expect a subset of columns (from this node's perspective). 
See CASSANDRA-15899
-                 if (column == null)
-                     return;
++                assert column != null;
 +
 +                try
 +                {
 +                    if (cd.column.isSimple())
 +                        Cell.serializer.serialize((Cell) cd, column, out, 
pkLiveness, header);
 +                    else
 +                        writeComplexColumn((ComplexColumnData) cd, column, 
(flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out);
 +                }
 +                catch (IOException e)
 +                {
 +                    throw new WrappedException(e);
 +                }
 +            }, false);
 +        }
 +        catch (WrappedException e)
 +        {
 +            if (e.getCause() instanceof IOException)
 +                throw (IOException) e.getCause();
  
 -            if (data.column.isSimple())
 -                Cell.serializer.serialize((Cell) data, column, out, 
pkLiveness, header);
 -            else
 -                writeComplexColumn((ComplexColumnData) data, column, 
hasComplexDeletion, pkLiveness, header, out);
 +            throw e;
          }
      }
  
@@@ -342,7 -274,7 +336,7 @@@
          LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
          Row.Deletion deletion = row.deletion();
          boolean hasComplexDeletion = row.hasComplexDeletion();
-         boolean hasAllColumns = header.hasAllColumns(row, isStatic);
 -        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
++        boolean hasAllColumns = row.columnCount() == headerColumns.size();
  
          if (!pkLiveness.isEmpty())
              size += header.timestampSerializedSize(pkLiveness.timestamp());
@@@ -603,35 -482,12 +597,31 @@@
              builder.addRowDeletion(hasDeletion ? new 
Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : 
Row.Deletion.LIVE);
  
              Columns columns = hasAllColumns ? headerColumns : 
Columns.serializer.deserializeSubset(headerColumns, in);
 -            for (ColumnDefinition column : columns)
 +
 +            final LivenessInfo livenessInfo = rowLiveness;
 +
 +            try
              {
 -                if (column.isSimple())
 -                    readSimpleColumn(column, in, header, helper, builder, 
rowLiveness);
 -                else
 -                    readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, rowLiveness);
 +                columns.apply(column -> {
 +                    try
 +                    {
-                         // if the column is a placeholder, then it's not part 
of our schema, and we can't deserialize it
-                         if (column.isPlaceholder())
-                             throw new UnknownColumnException(column.ksName, 
column.cfName, column.name.bytes);
- 
 +                        if (column.isSimple())
 +                            readSimpleColumn(column, in, header, helper, 
builder, livenessInfo);
 +                        else
 +                            readComplexColumn(column, in, header, helper, 
hasComplexDeletion, builder, livenessInfo);
 +                    }
 +                    catch (IOException e)
 +                    {
 +                        throw new WrappedException(e);
 +                    }
 +                }, false);
 +            }
 +            catch (WrappedException e)
 +            {
 +                if (e.getCause() instanceof IOException)
 +                    throw (IOException) e.getCause();
 +
 +                throw e;
              }
  
              return builder.build();
diff --cc test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 2b5dab1,b8860fd..8f7b47f
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@@ -18,16 -18,12 +18,11 @@@
  
  package org.apache.cassandra.distributed.test;
  
- import java.util.function.Consumer;
- 
  import org.junit.Test;
  
- import org.apache.cassandra.dht.ByteOrderedPartitioner;
  import org.apache.cassandra.distributed.Cluster;
  import org.apache.cassandra.distributed.api.ConsistencyLevel;
- import org.apache.cassandra.distributed.api.IInstanceConfig;
--
- import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+ import static org.junit.Assert.assertTrue;
  
  public class SchemaTest extends TestBaseImpl
  {
diff --cc 
test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 84a2307,3e8b76b..b7cef5f
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@@ -137,47 -134,11 +137,16 @@@ public class SimpleReadWriteTest extend
          }
  
          Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
 -        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
 +        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2"));
      }
  
-     /**
-      * If a node receives a mutation for a column it knows has been dropped, 
the write should succeed
-      */
-     @Test
-     public void writeWithSchemaDisagreement2() throws Throwable
-     {
-         cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
 +
-         cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
-         cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
-         cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
- 
-         for (int i=0; i<cluster.size(); i++)
-             cluster.get(i+1).flush(KEYSPACE);;
- 
-         // Introduce schema disagreement
-         cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
- 
-         // execute a write including the dropped column where the coordinator 
is not yet aware of the drop
-         // all nodes should process this without error
-         cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                            ConsistencyLevel.ALL);
-         // and flushing should also be fine
-         for (int i=0; i<cluster.size(); i++)
-             cluster.get(i+1).flush(KEYSPACE);;
-         // the results of reads will vary depending on whether the 
coordinator has seen the schema change
-         // note: read repairs will propagate the v2 value to node1, but this 
is safe and handled correctly
-         assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
-                    rows(row(1,1,1,1), row(2,2,2,2)));
-         assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
-                    rows(row(1,1,1), row(2,2,2)));
-     }
 +
 +    /**
 +     * If a node isn't aware of a column, but receives a mutation without 
that column, the write should succeed
 +     */
      @Test
 -    public void readWithSchemaDisagreement() throws Throwable
 +    public void writeWithInconsequentialSchemaDisagreement() throws Throwable
      {
          cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
  
@@@ -188,30 -149,22 +157,11 @@@
          // Introduce schema disagreement
          cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 
1);
  
 -        Exception thrown = null;
 -        try
 -        {
 -            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
 -                                                      ConsistencyLevel.ALL),
 -                       row(1, 1, 1, null));
 -        }
 -        catch (Exception e)
 -        {
 -            thrown = e;
 -        }
 -
 -        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
 -        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
 +        // this write shouldn't cause any problems because it doesn't write 
to the new column
 +        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (2, 2, 2)",
 +                                       ConsistencyLevel.ALL);
      }
  
-     /**
-      * If a node receives a read for a column it's not aware of, it shouldn't 
complain, since it won't have any data for that column
-      */
-     @Test
-     public void readWithSchemaDisagreement() throws Throwable
-     {
-         cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
- 
-         cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
-         cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
-         cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
- 
-         // Introduce schema disagreement
-         cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 
1);
- 
-         Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
-         assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
-     }
- 
      @Test
      public void simplePagedReadsTest() throws Throwable
      {

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to