This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cassandra-3.0 in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push: new 31b9078 Handle unexpected columns due to schema races 31b9078 is described below commit 31b9078a691a6f93b104cc6b3f72fe2fbf6557f6 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Mon Oct 5 14:17:38 2020 -0700 Handle unexpected columns due to schema races Patch by Blake Eggleston; Reviewed by Sam Tunnicliffe for CASSANDRA-15899 --- CHANGES.txt | 1 + .../apache/cassandra/config/ColumnDefinition.java | 23 ++++ src/java/org/apache/cassandra/db/Columns.java | 19 +++- .../apache/cassandra/db/SerializationHeader.java | 17 ++- .../cassandra/db/UnknownColumnException.java | 12 ++- .../apache/cassandra/db/filter/ColumnFilter.java | 8 +- .../cassandra/db/partitions/PartitionUpdate.java | 7 ++ .../cassandra/db/rows/UnfilteredSerializer.java | 19 +++- .../cassandra/distributed/test/SchemaTest.java | 117 +++++++++++++++++++++ .../distributed/test/SimpleReadWriteTest.java | 86 ++++++++++++--- test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +- 11 files changed, 278 insertions(+), 33 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5f326ce..1ea5184 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.23: + * Handle unexpected columns due to schema races (CASSANDRA-15899) * Avoid failing compactions with very large partitions (CASSANDRA-15164) * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160) diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java b/src/java/org/apache/cassandra/config/ColumnDefinition.java index 6f7f749..93c89b5 100644 --- a/src/java/org/apache/cassandra/config/ColumnDefinition.java +++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java @@ -190,6 +190,29 @@ public class ColumnDefinition extends ColumnSpecification implements Comparable< }; } + private static class Placeholder extends ColumnDefinition + { + Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type, int position, Kind kind) + { + super(table, name, type, position, kind); + } + + public boolean isPlaceholder() + { + return true; + } + } + + public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer name, boolean isStatic) + { + return new Placeholder(table, name, EmptyType.instance, NO_POSITION, isStatic ? Kind.STATIC : Kind.REGULAR); + } + + public boolean isPlaceholder() + { + return false; + } + public ColumnDefinition copy() { return new ColumnDefinition(ksName, cfName, name, type, position, kind); diff --git a/src/java/org/apache/cassandra/db/Columns.java b/src/java/org/apache/cassandra/db/Columns.java index 18e17d7..ef32fe0 100644 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@ -425,7 +425,7 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col return size; } - public Columns deserialize(DataInputPlus in, CFMetaData metadata) throws IOException + public Columns deserialize(DataInputPlus in, CFMetaData metadata, boolean isStatic) throws IOException { int length = (int)in.readUnsignedVInt(); BTree.Builder<ColumnDefinition> builder = BTree.builder(Comparator.naturalOrder()); @@ -441,14 +441,29 @@ public class Columns extends AbstractCollection<ColumnDefinition> implements Col // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper // deserialization. The column will be ignore later on anyway. column = metadata.getDroppedColumnDefinition(name); + + // If there's no dropped column, it may be for a column we haven't received a schema update for yet + // so we create a placeholder column. If this is a read, the placeholder column will let the response + // serializer know we're not serializing all requested columns when it writes the row flags, but it + // will cause mutations that try to write values for this column to fail. if (column == null) - throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); + column = ColumnDefinition.placeholder(metadata, name, isStatic); } builder.add(column); } return new Columns(builder.build()); } + public Columns deserializeStatics(DataInputPlus in, CFMetaData metadata) throws IOException + { + return deserialize(in, metadata, true); + } + + public Columns deserializeRegulars(DataInputPlus in, CFMetaData metadata) throws IOException + { + return deserialize(in, metadata, false); + } + /** * If both ends have a pre-shared superset of the columns we are serializing, we can send them much * more efficiently. Both ends must provide the identically same set of columns. diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java b/src/java/org/apache/cassandra/db/SerializationHeader.java index 5c4f518..428acd0 100644 --- a/src/java/org/apache/cassandra/db/SerializationHeader.java +++ b/src/java/org/apache/cassandra/db/SerializationHeader.java @@ -40,6 +40,7 @@ import org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.SearchIterator; public class SerializationHeader { @@ -160,6 +161,18 @@ public class SerializationHeader return !columns.statics.isEmpty(); } + public boolean hasAllColumns(Row row, boolean isStatic) + { + SearchIterator<ColumnDefinition, ColumnData> rowIter = row.searchIterator(); + Iterable<ColumnDefinition> columns = isStatic ? columns().statics : columns().regulars; + for (ColumnDefinition column : columns) + { + if (rowIter.next(column) == null) + return false; + } + return true; + } + public boolean isForSSTable() { return isForSSTable; @@ -442,8 +455,8 @@ public class SerializationHeader Columns statics, regulars; if (selection == null) { - statics = hasStatic ? Columns.serializer.deserialize(in, metadata) : Columns.NONE; - regulars = Columns.serializer.deserialize(in, metadata); + statics = hasStatic ? Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE; + regulars = Columns.serializer.deserializeRegulars(in, metadata); } else { diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java b/src/java/org/apache/cassandra/db/UnknownColumnException.java index 55dc453..cec60ea 100644 --- a/src/java/org/apache/cassandra/db/UnknownColumnException.java +++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db; +import java.io.IOException; import java.nio.ByteBuffer; import org.apache.cassandra.config.CFMetaData; @@ -27,16 +28,21 @@ import org.apache.cassandra.utils.ByteBufferUtil; * Exception thrown when we read a column internally that is unknown. Note that * this is an internal exception and is not meant to be user facing. */ -public class UnknownColumnException extends Exception +public class UnknownColumnException extends IOException { public final ByteBuffer columnName; - public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName) + public UnknownColumnException(String ksName, String cfName, ByteBuffer columnName) { - super(String.format("Unknown column %s in table %s.%s", stringify(columnName), metadata.ksName, metadata.cfName)); + super(String.format("Unknown column %s in table %s.%s", stringify(columnName), ksName, cfName)); this.columnName = columnName; } + public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName) + { + this(metadata.ksName, metadata.cfName, columnName); + } + private static String stringify(ByteBuffer name) { try diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java index c28c0ae..858c944 100644 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@ -431,8 +431,8 @@ public class ColumnFilter { if (version >= MessagingService.VERSION_3014) { - Columns statics = Columns.serializer.deserialize(in, metadata); - Columns regulars = Columns.serializer.deserialize(in, metadata); + Columns statics = Columns.serializer.deserializeStatics(in, metadata); + Columns regulars = Columns.serializer.deserializeRegulars(in, metadata); fetched = new PartitionColumns(statics, regulars); } else @@ -443,8 +443,8 @@ public class ColumnFilter if (hasSelection) { - Columns statics = Columns.serializer.deserialize(in, metadata); - Columns regulars = Columns.serializer.deserialize(in, metadata); + Columns statics = Columns.serializer.deserializeStatics(in, metadata); + Columns regulars = Columns.serializer.deserializeRegulars(in, metadata); selection = new PartitionColumns(statics, regulars); } diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 3560e90..3a67073 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.partitions; +import java.io.IOError; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -753,6 +754,12 @@ public class PartitionUpdate extends AbstractBTreePartition deletionBuilder.add((RangeTombstoneMarker)unfiltered); } } + catch (IOError e) + { + if (e.getCause() != null && e.getCause() instanceof UnknownColumnException) + throw (UnknownColumnException) e.getCause(); + throw e; + } MutableDeletionInfo deletionInfo = deletionBuilder.build(); return new PartitionUpdate(metadata, diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 0342e39..9e11f94 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -19,9 +19,9 @@ package org.apache.cassandra.db.rows; import java.io.IOException; -import com.google.common.collect.Collections2; import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -133,7 +133,7 @@ public class UnfilteredSerializer LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); boolean hasComplexDeletion = row.hasComplexDeletion(); - boolean hasAllColumns = (row.columnCount() == headerColumns.size()); + boolean hasAllColumns = header.hasAllColumns(row, isStatic); boolean hasExtendedFlags = hasExtendedFlags(row); if (isStatic) @@ -192,7 +192,12 @@ public class UnfilteredSerializer // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what // happens if we don't do that. ColumnDefinition column = si.next(data.column()); - assert column != null; + + // we may have columns that the remote node isn't aware of due to inflight schema changes + // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only + // expect a subset of columns (from this node's perspective). See CASSANDRA-15899 + if (column == null) + continue; if (data.column.isSimple()) Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header); @@ -274,7 +279,7 @@ public class UnfilteredSerializer LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); boolean hasComplexDeletion = row.hasComplexDeletion(); - boolean hasAllColumns = (row.columnCount() == headerColumns.size()); + boolean hasAllColumns = header.hasAllColumns(row, isStatic); if (!pkLiveness.isEmpty()) size += header.timestampSerializedSize(pkLiveness.timestamp()); @@ -293,7 +298,8 @@ public class UnfilteredSerializer for (ColumnData data : row) { ColumnDefinition column = si.next(data.column()); - assert column != null; + if (column == null) + continue; if (data.column.isSimple()) size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header); @@ -484,6 +490,9 @@ public class UnfilteredSerializer Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in); for (ColumnDefinition column : columns) { + // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it + if (column.isPlaceholder()) + throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes); if (column.isSimple()) readSimpleColumn(column, in, header, helper, builder, rowLiveness); else diff --git a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java new file mode 100644 index 0000000..2b5dab1 --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.function.Consumer; + +import org.junit.Test; + +import org.apache.cassandra.dht.ByteOrderedPartitioner; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.ConsistencyLevel; +import org.apache.cassandra.distributed.api.IInstanceConfig; + +import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + +public class SchemaTest extends TestBaseImpl +{ + private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config -> { + config.set("partitioner", ByteOrderedPartitioner.class.getSimpleName()); + config.set("initial_token", Integer.toString(config.num() * 1000)); + }; + + @Test + public void dropColumnMixedMode() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER))) + { + cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int, v3 int)"); + Object [][] someExpected = new Object[5][]; + Object [][] allExpected1 = new Object[5][]; + Object [][] allExpected2 = new Object[5][]; + for (int i = 0; i < 5; i++) + { + int v1 = i * 10, v2 = i * 100, v3 = i * 1000; + cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3); + someExpected[i] = new Object[] {i, v1}; + allExpected1[i] = new Object[] {i, v1, v3}; + allExpected2[i] = new Object[] {i, v1, v2, v3}; + } + cluster.forEach((instance) -> instance.flush(KEYSPACE)); + cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2"); + assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1); + assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2); + } + } + + @Test + public void addColumnMixedMode() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER))) + { + cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)"); + Object [][] someExpected = new Object[5][]; + Object [][] allExpected1 = new Object[5][]; + Object [][] allExpected2 = new Object[5][]; + for (int i = 0; i < 5; i++) + { + int v1 = i * 10, v2 = i * 100; + cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2); + someExpected[i] = new Object[] {i, v1}; + allExpected1[i] = new Object[] {i, v1, v2, null}; + allExpected2[i] = new Object[] {i, v1, v2}; + } + cluster.forEach((instance) -> instance.flush(KEYSPACE)); + cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int"); + assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1); + assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2); + } + } + + @Test + public void addDropColumnMixedMode() throws Throwable + { + try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER))) + { + cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int primary key, v1 int, v2 int)"); + Object [][] someExpected = new Object[5][]; + Object [][] allExpected1 = new Object[5][]; + Object [][] allExpected2 = new Object[5][]; + for (int i = 0; i < 5; i++) + { + int v1 = i * 10, v2 = i * 100; + cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl (id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2); + someExpected[i] = new Object[] {i, v1}; + allExpected1[i] = new Object[] {i, v1, v2, null}; + allExpected2[i] = new Object[] {i, v1}; + } + cluster.forEach((instance) -> instance.flush(KEYSPACE)); + cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl ADD v3 int"); + cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl DROP v2"); + assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(1).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1); + assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected); + assertRows(cluster.coordinator(2).execute("SELECT * FROM "+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 75e5ba9..17064fa 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@ -92,6 +92,9 @@ public class SimpleReadWriteTest extends SharedClusterTestBase row(1, 1, 1)); } + /** + * If a node receives a mutation for a column it's not aware of, it should fail, since it can't write the data. + */ @Test public void writeWithSchemaDisagreement() throws Throwable { @@ -108,7 +111,7 @@ public class SimpleReadWriteTest extends SharedClusterTestBase try { cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", - ConsistencyLevel.QUORUM); + ConsistencyLevel.ALL); } catch (RuntimeException e) { @@ -116,11 +119,46 @@ public class SimpleReadWriteTest extends SharedClusterTestBase } Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); - Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); } + /** + * If a node receives a mutation for a column it knows has been dropped, the write should succeed + */ @Test - public void readWithSchemaDisagreement() throws Throwable + public void writeWithSchemaDisagreement2() throws Throwable + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + + for (int i=0; i<cluster.size(); i++) + cluster.get(i+1).flush(KEYSPACE);; + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1); + + // execute a write including the dropped column where the coordinator is not yet aware of the drop + // all nodes should process this without error + cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", + ConsistencyLevel.ALL); + // and flushing should also be fine + for (int i=0; i<cluster.size(); i++) + cluster.get(i+1).flush(KEYSPACE);; + // the results of reads will vary depending on whether the coordinator has seen the schema change + // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly + assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), + rows(row(1,1,1,1), row(2,2,2,2))); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), + rows(row(1,1,1), row(2,2,2))); + } + + /** + * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed + */ + @Test + public void writeWithInconsequentialSchemaDisagreement() throws Throwable { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); @@ -131,20 +169,28 @@ public class SimpleReadWriteTest extends SharedClusterTestBase // Introduce schema disagreement cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - Exception thrown = null; - try - { - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.ALL), - row(1, 1, 1, null)); - } - catch (Exception e) - { - thrown = e; - } + // this write shouldn't cause any problems because it doesn't write to the new column + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)", + ConsistencyLevel.ALL); + } - Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); - Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); + /** + * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column + */ + @Test + public void readWithSchemaDisagreement() throws Throwable + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); + + Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}}; + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected); } @Test @@ -374,4 +420,12 @@ public class SimpleReadWriteTest extends SharedClusterTestBase { return instance.callOnInstance(() -> Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount()); } + + private static Object[][] rows(Object[]...rows) + { + Object[][] r = new Object[rows.length][]; + System.arraycopy(rows, 0, r, 0, rows.length); + return r; + } + } diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java b/test/unit/org/apache/cassandra/db/ColumnsTest.java index 9498e8b..444e79a 100644 --- a/test/unit/org/apache/cassandra/db/ColumnsTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java @@ -132,7 +132,7 @@ public class ColumnsTest { Columns.serializer.serialize(columns, out); Assert.assertEquals(Columns.serializer.serializedSize(columns), out.buffer().remaining()); - Columns deserialized = Columns.serializer.deserialize(new DataInputBuffer(out.buffer(), false), mock(columns)); + Columns deserialized = Columns.serializer.deserializeRegulars(new DataInputBuffer(out.buffer(), false), mock(columns)); Assert.assertEquals(columns, deserialized); Assert.assertEquals(columns.hashCode(), deserialized.hashCode()); assertContents(deserialized, definitions); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org