This is an automated email from the ASF dual-hosted git repository.

marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
     new 1d87da3  Adding columns via ALTER TABLE can generate corrupt sstables
1d87da3 is described below

commit 1d87da3f6fc0eca4e805238c19db16e6607b44a7
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jun 14 11:42:58 2021 +0200

    Adding columns via ALTER TABLE can generate corrupt sstables
    
    Patch by marcuse; reviewed by Alex Petrov and Sam Tunnicliffe for 
CASSANDRA-16735
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/config/ColumnDefinition.java  |  23 -----
 src/java/org/apache/cassandra/db/Columns.java      |  19 +---
 .../apache/cassandra/db/SerializationHeader.java   |  17 +---
 .../cassandra/db/UnknownColumnException.java       |  12 +--
 .../apache/cassandra/db/filter/ColumnFilter.java   |   8 +-
 .../cassandra/db/partitions/PartitionUpdate.java   |   7 --
 .../cassandra/db/rows/UnfilteredSerializer.java    |  19 +---
 .../cassandra/distributed/test/SchemaTest.java     | 112 ++++++++-------------
 .../distributed/test/SimpleReadWriteTest.java      |  86 +++-------------
 test/unit/org/apache/cassandra/db/ColumnsTest.java |   2 +-
 11 files changed, 76 insertions(+), 230 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 335050d..91d85be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 3.0.25:
+ * Adding columns via ALTER TABLE can generate corrupt sstables 
(CASSANDRA-16735)
  * Add flag to disable ALTER...DROP COMPACT STORAGE statements 
(CASSANDRA-16733)
  * Clean transaction log leftovers at the beginning of sstablelevelreset and 
sstableofflinerelevel (CASSANDRA-12519)
  * CQL shell should prefer newer TLS version by default (CASSANDRA-16695)
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java 
b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 93c89b5..6f7f749 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -190,29 +190,6 @@ public class ColumnDefinition extends ColumnSpecification 
implements Comparable<
         };
     }
 
-    private static class Placeholder extends ColumnDefinition
-    {
-        Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type, 
int position, Kind kind)
-        {
-            super(table, name, type, position, kind);
-        }
-
-        public boolean isPlaceholder()
-        {
-            return true;
-        }
-    }
-
-    public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer 
name, boolean isStatic)
-    {
-        return new Placeholder(table, name, EmptyType.instance, NO_POSITION, 
isStatic ? Kind.STATIC : Kind.REGULAR);
-    }
-
-    public boolean isPlaceholder()
-    {
-        return false;
-    }
-
     public ColumnDefinition copy()
     {
         return new ColumnDefinition(ksName, cfName, name, type, position, 
kind);
diff --git a/src/java/org/apache/cassandra/db/Columns.java 
b/src/java/org/apache/cassandra/db/Columns.java
index ef32fe0..18e17d7 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -425,7 +425,7 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
             return size;
         }
 
-        public Columns deserialize(DataInputPlus in, CFMetaData metadata, 
boolean isStatic) throws IOException
+        public Columns deserialize(DataInputPlus in, CFMetaData metadata) 
throws IOException
         {
             int length = (int)in.readUnsignedVInt();
             BTree.Builder<ColumnDefinition> builder = 
BTree.builder(Comparator.naturalOrder());
@@ -441,29 +441,14 @@ public class Columns extends 
AbstractCollection<ColumnDefinition> implements Col
                     // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
                     // deserialization. The column will be ignore later on 
anyway.
                     column = metadata.getDroppedColumnDefinition(name);
-
-                    // If there's no dropped column, it may be for a column we 
haven't received a schema update for yet
-                    // so we create a placeholder column. If this is a read, 
the placeholder column will let the response
-                    // serializer know we're not serializing all requested 
columns when it writes the row flags, but it
-                    // will cause mutations that try to write values for this 
column to fail.
                     if (column == null)
-                        column = ColumnDefinition.placeholder(metadata, name, 
isStatic);
+                        throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
                 }
                 builder.add(column);
             }
             return new Columns(builder.build());
         }
 
-        public Columns deserializeStatics(DataInputPlus in, CFMetaData 
metadata) throws IOException
-        {
-            return deserialize(in, metadata, true);
-        }
-
-        public Columns deserializeRegulars(DataInputPlus in, CFMetaData 
metadata) throws IOException
-        {
-            return deserialize(in, metadata, false);
-        }
-
         /**
          * If both ends have a pre-shared superset of the columns we are 
serializing, we can send them much
          * more efficiently. Both ends must provide the identically same set 
of columns.
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 428acd0..5c4f518 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -40,7 +40,6 @@ import 
org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.SearchIterator;
 
 public class SerializationHeader
 {
@@ -161,18 +160,6 @@ public class SerializationHeader
         return !columns.statics.isEmpty();
     }
 
-    public boolean hasAllColumns(Row row, boolean isStatic)
-    {
-        SearchIterator<ColumnDefinition, ColumnData> rowIter = 
row.searchIterator();
-        Iterable<ColumnDefinition> columns = isStatic ? columns().statics : 
columns().regulars;
-        for (ColumnDefinition column : columns)
-        {
-            if (rowIter.next(column) == null)
-                return false;
-        }
-        return true;
-    }
-
     public boolean isForSSTable()
     {
         return isForSSTable;
@@ -455,8 +442,8 @@ public class SerializationHeader
             Columns statics, regulars;
             if (selection == null)
             {
-                statics = hasStatic ? 
Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE;
-                regulars = Columns.serializer.deserializeRegulars(in, 
metadata);
+                statics = hasStatic ? Columns.serializer.deserialize(in, 
metadata) : Columns.NONE;
+                regulars = Columns.serializer.deserialize(in, metadata);
             }
             else
             {
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java 
b/src/java/org/apache/cassandra/db/UnknownColumnException.java
index cec60ea..55dc453 100644
--- a/src/java/org/apache/cassandra/db/UnknownColumnException.java
+++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.cassandra.config.CFMetaData;
@@ -28,19 +27,14 @@ import org.apache.cassandra.utils.ByteBufferUtil;
  * Exception thrown when we read a column internally that is unknown. Note that
  * this is an internal exception and is not meant to be user facing.
  */
-public class UnknownColumnException extends IOException
+public class UnknownColumnException extends Exception
 {
     public final ByteBuffer columnName;
 
-    public UnknownColumnException(String ksName, String cfName, ByteBuffer 
columnName)
-    {
-        super(String.format("Unknown column %s in table %s.%s", 
stringify(columnName), ksName, cfName));
-        this.columnName = columnName;
-    }
-
     public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
     {
-        this(metadata.ksName, metadata.cfName, columnName);
+        super(String.format("Unknown column %s in table %s.%s", 
stringify(columnName), metadata.ksName, metadata.cfName));
+        this.columnName = columnName;
     }
 
     private static String stringify(ByteBuffer name)
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java 
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index cbc8871..c658c12 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -396,8 +396,8 @@ public class ColumnFilter
             {
                 if (version >= MessagingService.VERSION_3014)
                 {
-                    Columns statics = 
Columns.serializer.deserializeStatics(in, metadata);
-                    Columns regulars = 
Columns.serializer.deserializeRegulars(in, metadata);
+                    Columns statics = Columns.serializer.deserialize(in, 
metadata);
+                    Columns regulars = Columns.serializer.deserialize(in, 
metadata);
                     fetched = new PartitionColumns(statics, regulars);
                 }
                 else
@@ -408,8 +408,8 @@ public class ColumnFilter
 
             if (hasSelection)
             {
-                Columns statics = Columns.serializer.deserializeStatics(in, 
metadata);
-                Columns regulars = Columns.serializer.deserializeRegulars(in, 
metadata);
+                Columns statics = Columns.serializer.deserialize(in, metadata);
+                Columns regulars = Columns.serializer.deserialize(in, 
metadata);
                 selection = new PartitionColumns(statics, regulars);
             }
 
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java 
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3a67073..3560e90 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -17,7 +17,6 @@
  */
 package org.apache.cassandra.db.partitions;
 
-import java.io.IOError;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -754,12 +753,6 @@ public class PartitionUpdate extends AbstractBTreePartition
                         deletionBuilder.add((RangeTombstoneMarker)unfiltered);
                 }
             }
-            catch (IOError e)
-            {
-                if (e.getCause() != null && e.getCause() instanceof 
UnknownColumnException)
-                    throw (UnknownColumnException) e.getCause();
-                throw e;
-            }
 
             MutableDeletionInfo deletionInfo = deletionBuilder.build();
             return new PartitionUpdate(metadata,
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java 
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 9e11f94..0342e39 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.rows;
 
 import java.io.IOException;
 
+import com.google.common.collect.Collections2;
 
 import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.marshal.UTF8Type;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -133,7 +133,7 @@ public class UnfilteredSerializer
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
-        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
+        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
         boolean hasExtendedFlags = hasExtendedFlags(row);
 
         if (isStatic)
@@ -192,12 +192,7 @@ public class UnfilteredSerializer
             // with. So we use the ColumnDefinition from the "header" which is 
"current". Also see #11810 for what
             // happens if we don't do that.
             ColumnDefinition column = si.next(data.column());
-
-            // we may have columns that the remote node isn't aware of due to 
inflight schema changes
-            // in cases where it tries to fetch all columns, it will set the 
`all columns` flag, but only
-            // expect a subset of columns (from this node's perspective). See 
CASSANDRA-15899
-            if (column == null)
-                continue;
+            assert column != null;
 
             if (data.column.isSimple())
                 Cell.serializer.serialize((Cell) data, column, out, 
pkLiveness, header);
@@ -279,7 +274,7 @@ public class UnfilteredSerializer
         LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
         Row.Deletion deletion = row.deletion();
         boolean hasComplexDeletion = row.hasComplexDeletion();
-        boolean hasAllColumns = header.hasAllColumns(row, isStatic);
+        boolean hasAllColumns = (row.columnCount() == headerColumns.size());
 
         if (!pkLiveness.isEmpty())
             size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -298,8 +293,7 @@ public class UnfilteredSerializer
         for (ColumnData data : row)
         {
             ColumnDefinition column = si.next(data.column());
-            if (column == null)
-                continue;
+            assert column != null;
 
             if (data.column.isSimple())
                 size += Cell.serializer.serializedSize((Cell) data, column, 
pkLiveness, header);
@@ -490,9 +484,6 @@ public class UnfilteredSerializer
             Columns columns = hasAllColumns ? headerColumns : 
Columns.serializer.deserializeSubset(headerColumns, in);
             for (ColumnDefinition column : columns)
             {
-                // if the column is a placeholder, then it's not part of our 
schema, and we can't deserialize it
-                if (column.isPlaceholder())
-                    throw new UnknownColumnException(column.ksName, 
column.cfName, column.name.bytes);
                 if (column.isSimple())
                     readSimpleColumn(column, in, header, helper, builder, 
rowLiveness);
                 else
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 2b5dab1..b8860fd 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@ -18,100 +18,72 @@
 
 package org.apache.cassandra.distributed.test;
 
-import java.util.function.Consumer;
-
 import org.junit.Test;
 
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
 import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
 
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.assertTrue;
 
 public class SchemaTest extends TestBaseImpl
 {
-    private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> 
{
-        config.set("partitioner", 
ByteOrderedPartitioner.class.getSimpleName());
-        config.set("initial_token", Integer.toString(config.num() * 1000));
-    };
-
     @Test
-    public void dropColumnMixedMode() throws Throwable
+    public void readRepair() throws Throwable
     {
-        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        try (Cluster cluster = init(Cluster.build(2).start()))
         {
-            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int 
primary key, v1 int, v2 int, v3 int)");
-            Object [][] someExpected = new Object[5][];
-            Object [][] allExpected1 = new Object[5][];
-            Object [][] allExpected2 = new Object[5][];
-            for (int i = 0; i < 5; i++)
-            {
-                int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
-                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl 
(id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
-                someExpected[i] = new Object[] {i, v1};
-                allExpected1[i] = new Object[] {i, v1, v3};
-                allExpected2[i] = new Object[] {i, v1, v2, v3};
-            }
-            cluster.forEach((instance) -> instance.flush(KEYSPACE));
-            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl 
DROP v2");
-            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
-            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(2).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int,  primary key (pk, ck))");
+            String name = "aaa";
+            cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) values (?,1,1,1)", 1);
+            selectSilent(cluster, name);
+
+            cluster.get(2).flush(KEYSPACE);
+            cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            cluster.get(2).shutdown();
+            cluster.get(2).startup();
+            cluster.get(2).forceCompact(KEYSPACE, "tbl");
         }
     }
 
     @Test
-    public void addColumnMixedMode() throws Throwable
+    public void readRepairWithCompaction() throws Throwable
     {
-        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        try (Cluster cluster = init(Cluster.build(2).start()))
         {
-            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int 
primary key, v1 int, v2 int)");
-            Object [][] someExpected = new Object[5][];
-            Object [][] allExpected1 = new Object[5][];
-            Object [][] allExpected2 = new Object[5][];
-            for (int i = 0; i < 5; i++)
-            {
-                int v1 = i * 10, v2 = i * 100;
-                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl 
(id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
-                someExpected[i] = new Object[] {i, v1};
-                allExpected1[i] = new Object[] {i, v1, v2, null};
-                allExpected2[i] = new Object[] {i, v1, v2};
-            }
-            cluster.forEach((instance) -> instance.flush(KEYSPACE));
-            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl 
ADD v3 int");
-            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
-            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(2).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+            cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, 
ck int, v1 int, v2 int,  primary key (pk, ck))");
+            String name = "v10";
+            cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) values (?,1,1,1)", 1);
+            selectSilent(cluster, name);
+            cluster.get(2).flush(KEYSPACE);
+            cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE + 
".tbl ADD " + name + " list<int>");
+            cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2, " + name + ") values (?,1,1,1,[1])", 1);
+            cluster.get(2).flush(KEYSPACE);
+            cluster.get(2).forceCompact(KEYSPACE, "tbl");
+            cluster.get(2).shutdown();
+            cluster.get(2).startup();
+            cluster.get(2).forceCompact(KEYSPACE, "tbl");
         }
     }
 
-    @Test
-    public void addDropColumnMixedMode() throws Throwable
+    private void selectSilent(Cluster cluster, String name)
     {
-        try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+        try
+        {
+            cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl 
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
+        }
+        catch (Exception e)
         {
-            cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int 
primary key, v1 int, v2 int)");
-            Object [][] someExpected = new Object[5][];
-            Object [][] allExpected1 = new Object[5][];
-            Object [][] allExpected2 = new Object[5][];
-            for (int i = 0; i < 5; i++)
+            boolean causeIsUnknownColumn = false;
+            Throwable cause = e;
+            while (cause != null)
             {
-                int v1 = i * 10, v2 = i * 100;
-                cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl 
(id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
-                someExpected[i] = new Object[] {i, v1};
-                allExpected1[i] = new Object[] {i, v1, v2, null};
-                allExpected2[i] = new Object[] {i, v1};
+                if (cause.getMessage() != null && 
cause.getMessage().contains("Unknown column "+name+" during deserialization"))
+                    causeIsUnknownColumn = true;
+                cause = cause.getCause();
             }
-            cluster.forEach((instance) -> instance.flush(KEYSPACE));
-            cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl 
ADD v3 int");
-            cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl 
DROP v2");
-            assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(1).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
-            assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
-            assertRows(cluster.coordinator(2).execute("SELECT * FROM 
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+            assertTrue(causeIsUnknownColumn);
         }
     }
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 4f205f4..3e8b76b 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -110,9 +110,6 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
                    row(1, 1, 1));
     }
 
-    /**
-     * If a node receives a mutation for a column it's not aware of, it should 
fail, since it can't write the data.
-     */
     @Test
     public void writeWithSchemaDisagreement() throws Throwable
     {
@@ -129,7 +126,7 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
         try
         {
             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl 
(pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                           ConsistencyLevel.ALL);
+                                           ConsistencyLevel.QUORUM);
         }
         catch (RuntimeException e)
         {
@@ -137,64 +134,9 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
         }
 
         Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
+        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
     }
 
-    /**
-     * If a node receives a mutation for a column it knows has been dropped, 
the write should succeed
-     */
-    @Test
-    public void writeWithSchemaDisagreement2() throws Throwable
-    {
-        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
-
-        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
-        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
-        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (1, 1, 1, 1)");
-
-        for (int i=0; i<cluster.size(); i++)
-            cluster.get(i+1).flush(KEYSPACE);;
-
-        // Introduce schema disagreement
-        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
-
-        // execute a write including the dropped column where the coordinator 
is not yet aware of the drop
-        // all nodes should process this without error
-        cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1, v2) VALUES (2, 2, 2, 2)",
-                                           ConsistencyLevel.ALL);
-        // and flushing should also be fine
-        for (int i=0; i<cluster.size(); i++)
-            cluster.get(i+1).flush(KEYSPACE);;
-        // the results of reads will vary depending on whether the coordinator 
has seen the schema change
-        // note: read repairs will propagate the v2 value to node1, but this 
is safe and handled correctly
-        assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
-                   rows(row(1,1,1,1), row(2,2,2,2)));
-        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl", ConsistencyLevel.ALL),
-                   rows(row(1,1,1), row(2,2,2)));
-    }
-
-    /**
-     * If a node isn't aware of a column, but receives a mutation without that 
column, the write should succeed
-     */
-    @Test
-    public void writeWithInconsequentialSchemaDisagreement() throws Throwable
-    {
-        cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck 
int, v1 int, PRIMARY KEY (pk, ck))");
-
-        cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
-        cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
-        cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (1, 1, 1)");
-
-        // Introduce schema disagreement
-        cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
-
-        // this write shouldn't cause any problems because it doesn't write to 
the new column
-        cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, 
ck, v1) VALUES (2, 2, 2)",
-                                       ConsistencyLevel.ALL);
-    }
-
-    /**
-     * If a node receives a read for a column it's not aware of, it shouldn't 
complain, since it won't have any data for that column
-     */
     @Test
     public void readWithSchemaDisagreement() throws Throwable
     {
@@ -207,8 +149,20 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
         // Introduce schema disagreement
         cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
 
-        Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
-        assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE 
+ ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
+        Exception thrown = null;
+        try
+        {
+            assertRows(cluster.coordinator(1).execute("SELECT * FROM " + 
KEYSPACE + ".tbl WHERE pk = 1",
+                                                      ConsistencyLevel.ALL),
+                       row(1, 1, 1, null));
+        }
+        catch (Exception e)
+        {
+            thrown = e;
+        }
+
+        Assert.assertTrue(thrown.getMessage().contains("Exception occurred on 
node"));
+        
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
 column v2 during deserialization"));
     }
 
     @Test
@@ -438,12 +392,4 @@ public class SimpleReadWriteTest extends 
SharedClusterTestBase
     {
         return instance.callOnInstance(() -> 
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
     }
-
-    private static Object[][] rows(Object[]...rows)
-    {
-        Object[][] r = new Object[rows.length][];
-        System.arraycopy(rows, 0, r, 0, rows.length);
-        return r;
-    }
-
 }
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java 
b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 444e79a..9498e8b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -132,7 +132,7 @@ public class ColumnsTest
         {
             Columns.serializer.serialize(columns, out);
             Assert.assertEquals(Columns.serializer.serializedSize(columns), 
out.buffer().remaining());
-            Columns deserialized = Columns.serializer.deserializeRegulars(new 
DataInputBuffer(out.buffer(), false), mock(columns));
+            Columns deserialized = Columns.serializer.deserialize(new 
DataInputBuffer(out.buffer(), false), mock(columns));
             Assert.assertEquals(columns, deserialized);
             Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
             assertContents(deserialized, definitions);

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to