This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch cassandra-3.0
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cassandra-3.0 by this push:
new 1d87da3 Adding columns via ALTER TABLE can generate corrupt sstables
1d87da3 is described below
commit 1d87da3f6fc0eca4e805238c19db16e6607b44a7
Author: Marcus Eriksson <[email protected]>
AuthorDate: Mon Jun 14 11:42:58 2021 +0200
Adding columns via ALTER TABLE can generate corrupt sstables
Patch by marcuse; reviewed by Alex Petrov and Sam Tunnicliffe for
CASSANDRA-16735
---
CHANGES.txt | 1 +
.../apache/cassandra/config/ColumnDefinition.java | 23 -----
src/java/org/apache/cassandra/db/Columns.java | 19 +---
.../apache/cassandra/db/SerializationHeader.java | 17 +---
.../cassandra/db/UnknownColumnException.java | 12 +--
.../apache/cassandra/db/filter/ColumnFilter.java | 8 +-
.../cassandra/db/partitions/PartitionUpdate.java | 7 --
.../cassandra/db/rows/UnfilteredSerializer.java | 19 +---
.../cassandra/distributed/test/SchemaTest.java | 112 ++++++++-------------
.../distributed/test/SimpleReadWriteTest.java | 86 +++-------------
test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +-
11 files changed, 76 insertions(+), 230 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 335050d..91d85be 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
3.0.25:
+ * Adding columns via ALTER TABLE can generate corrupt sstables
(CASSANDRA-16735)
* Add flag to disable ALTER...DROP COMPACT STORAGE statements
(CASSANDRA-16733)
* Clean transaction log leftovers at the beginning of sstablelevelreset and
sstableofflinerelevel (CASSANDRA-12519)
* CQL shell should prefer newer TLS version by default (CASSANDRA-16695)
diff --git a/src/java/org/apache/cassandra/config/ColumnDefinition.java
b/src/java/org/apache/cassandra/config/ColumnDefinition.java
index 93c89b5..6f7f749 100644
--- a/src/java/org/apache/cassandra/config/ColumnDefinition.java
+++ b/src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -190,29 +190,6 @@ public class ColumnDefinition extends ColumnSpecification
implements Comparable<
};
}
- private static class Placeholder extends ColumnDefinition
- {
- Placeholder(CFMetaData table, ByteBuffer name, AbstractType<?> type,
int position, Kind kind)
- {
- super(table, name, type, position, kind);
- }
-
- public boolean isPlaceholder()
- {
- return true;
- }
- }
-
- public static ColumnDefinition placeholder(CFMetaData table, ByteBuffer
name, boolean isStatic)
- {
- return new Placeholder(table, name, EmptyType.instance, NO_POSITION,
isStatic ? Kind.STATIC : Kind.REGULAR);
- }
-
- public boolean isPlaceholder()
- {
- return false;
- }
-
public ColumnDefinition copy()
{
return new ColumnDefinition(ksName, cfName, name, type, position,
kind);
diff --git a/src/java/org/apache/cassandra/db/Columns.java
b/src/java/org/apache/cassandra/db/Columns.java
index ef32fe0..18e17d7 100644
--- a/src/java/org/apache/cassandra/db/Columns.java
+++ b/src/java/org/apache/cassandra/db/Columns.java
@@ -425,7 +425,7 @@ public class Columns extends
AbstractCollection<ColumnDefinition> implements Col
return size;
}
- public Columns deserialize(DataInputPlus in, CFMetaData metadata,
boolean isStatic) throws IOException
+ public Columns deserialize(DataInputPlus in, CFMetaData metadata)
throws IOException
{
int length = (int)in.readUnsignedVInt();
BTree.Builder<ColumnDefinition> builder =
BTree.builder(Comparator.naturalOrder());
@@ -441,29 +441,14 @@ public class Columns extends
AbstractCollection<ColumnDefinition> implements Col
// fail deserialization because of that. So we grab a
"fake" ColumnDefinition that ensure proper
// deserialization. The column will be ignore later on
anyway.
column = metadata.getDroppedColumnDefinition(name);
-
- // If there's no dropped column, it may be for a column we
haven't received a schema update for yet
- // so we create a placeholder column. If this is a read,
the placeholder column will let the response
- // serializer know we're not serializing all requested
columns when it writes the row flags, but it
- // will cause mutations that try to write values for this
column to fail.
if (column == null)
- column = ColumnDefinition.placeholder(metadata, name,
isStatic);
+ throw new RuntimeException("Unknown column " +
UTF8Type.instance.getString(name) + " during deserialization");
}
builder.add(column);
}
return new Columns(builder.build());
}
- public Columns deserializeStatics(DataInputPlus in, CFMetaData
metadata) throws IOException
- {
- return deserialize(in, metadata, true);
- }
-
- public Columns deserializeRegulars(DataInputPlus in, CFMetaData
metadata) throws IOException
- {
- return deserialize(in, metadata, false);
- }
-
/**
* If both ends have a pre-shared superset of the columns we are
serializing, we can send them much
* more efficiently. Both ends must provide the identically same set
of columns.
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index 428acd0..5c4f518 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -40,7 +40,6 @@ import
org.apache.cassandra.io.sstable.metadata.IMetadataComponentSerializer;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.SearchIterator;
public class SerializationHeader
{
@@ -161,18 +160,6 @@ public class SerializationHeader
return !columns.statics.isEmpty();
}
- public boolean hasAllColumns(Row row, boolean isStatic)
- {
- SearchIterator<ColumnDefinition, ColumnData> rowIter =
row.searchIterator();
- Iterable<ColumnDefinition> columns = isStatic ? columns().statics :
columns().regulars;
- for (ColumnDefinition column : columns)
- {
- if (rowIter.next(column) == null)
- return false;
- }
- return true;
- }
-
public boolean isForSSTable()
{
return isForSSTable;
@@ -455,8 +442,8 @@ public class SerializationHeader
Columns statics, regulars;
if (selection == null)
{
- statics = hasStatic ?
Columns.serializer.deserializeStatics(in, metadata) : Columns.NONE;
- regulars = Columns.serializer.deserializeRegulars(in,
metadata);
+ statics = hasStatic ? Columns.serializer.deserialize(in,
metadata) : Columns.NONE;
+ regulars = Columns.serializer.deserialize(in, metadata);
}
else
{
diff --git a/src/java/org/apache/cassandra/db/UnknownColumnException.java
b/src/java/org/apache/cassandra/db/UnknownColumnException.java
index cec60ea..55dc453 100644
--- a/src/java/org/apache/cassandra/db/UnknownColumnException.java
+++ b/src/java/org/apache/cassandra/db/UnknownColumnException.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db;
-import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.cassandra.config.CFMetaData;
@@ -28,19 +27,14 @@ import org.apache.cassandra.utils.ByteBufferUtil;
* Exception thrown when we read a column internally that is unknown. Note that
* this is an internal exception and is not meant to be user facing.
*/
-public class UnknownColumnException extends IOException
+public class UnknownColumnException extends Exception
{
public final ByteBuffer columnName;
- public UnknownColumnException(String ksName, String cfName, ByteBuffer
columnName)
- {
- super(String.format("Unknown column %s in table %s.%s",
stringify(columnName), ksName, cfName));
- this.columnName = columnName;
- }
-
public UnknownColumnException(CFMetaData metadata, ByteBuffer columnName)
{
- this(metadata.ksName, metadata.cfName, columnName);
+ super(String.format("Unknown column %s in table %s.%s",
stringify(columnName), metadata.ksName, metadata.cfName));
+ this.columnName = columnName;
}
private static String stringify(ByteBuffer name)
diff --git a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
index cbc8871..c658c12 100644
--- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
+++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java
@@ -396,8 +396,8 @@ public class ColumnFilter
{
if (version >= MessagingService.VERSION_3014)
{
- Columns statics =
Columns.serializer.deserializeStatics(in, metadata);
- Columns regulars =
Columns.serializer.deserializeRegulars(in, metadata);
+ Columns statics = Columns.serializer.deserialize(in,
metadata);
+ Columns regulars = Columns.serializer.deserialize(in,
metadata);
fetched = new PartitionColumns(statics, regulars);
}
else
@@ -408,8 +408,8 @@ public class ColumnFilter
if (hasSelection)
{
- Columns statics = Columns.serializer.deserializeStatics(in,
metadata);
- Columns regulars = Columns.serializer.deserializeRegulars(in,
metadata);
+ Columns statics = Columns.serializer.deserialize(in, metadata);
+ Columns regulars = Columns.serializer.deserialize(in,
metadata);
selection = new PartitionColumns(statics, regulars);
}
diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
index 3a67073..3560e90 100644
--- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
+++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java
@@ -17,7 +17,6 @@
*/
package org.apache.cassandra.db.partitions;
-import java.io.IOError;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -754,12 +753,6 @@ public class PartitionUpdate extends AbstractBTreePartition
deletionBuilder.add((RangeTombstoneMarker)unfiltered);
}
}
- catch (IOError e)
- {
- if (e.getCause() != null && e.getCause() instanceof
UnknownColumnException)
- throw (UnknownColumnException) e.getCause();
- throw e;
- }
MutableDeletionInfo deletionInfo = deletionBuilder.build();
return new PartitionUpdate(metadata,
diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
index 9e11f94..0342e39 100644
--- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
+++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java
@@ -19,9 +19,9 @@ package org.apache.cassandra.db.rows;
import java.io.IOException;
+import com.google.common.collect.Collections2;
import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.db.*;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
@@ -133,7 +133,7 @@ public class UnfilteredSerializer
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = header.hasAllColumns(row, isStatic);
+ boolean hasAllColumns = (row.columnCount() == headerColumns.size());
boolean hasExtendedFlags = hasExtendedFlags(row);
if (isStatic)
@@ -192,12 +192,7 @@ public class UnfilteredSerializer
// with. So we use the ColumnDefinition from the "header" which is
"current". Also see #11810 for what
// happens if we don't do that.
ColumnDefinition column = si.next(data.column());
-
- // we may have columns that the remote node isn't aware of due to
inflight schema changes
- // in cases where it tries to fetch all columns, it will set the
`all columns` flag, but only
- // expect a subset of columns (from this node's perspective). See
CASSANDRA-15899
- if (column == null)
- continue;
+ assert column != null;
if (data.column.isSimple())
Cell.serializer.serialize((Cell) data, column, out,
pkLiveness, header);
@@ -279,7 +274,7 @@ public class UnfilteredSerializer
LivenessInfo pkLiveness = row.primaryKeyLivenessInfo();
Row.Deletion deletion = row.deletion();
boolean hasComplexDeletion = row.hasComplexDeletion();
- boolean hasAllColumns = header.hasAllColumns(row, isStatic);
+ boolean hasAllColumns = (row.columnCount() == headerColumns.size());
if (!pkLiveness.isEmpty())
size += header.timestampSerializedSize(pkLiveness.timestamp());
@@ -298,8 +293,7 @@ public class UnfilteredSerializer
for (ColumnData data : row)
{
ColumnDefinition column = si.next(data.column());
- if (column == null)
- continue;
+ assert column != null;
if (data.column.isSimple())
size += Cell.serializer.serializedSize((Cell) data, column,
pkLiveness, header);
@@ -490,9 +484,6 @@ public class UnfilteredSerializer
Columns columns = hasAllColumns ? headerColumns :
Columns.serializer.deserializeSubset(headerColumns, in);
for (ColumnDefinition column : columns)
{
- // if the column is a placeholder, then it's not part of our
schema, and we can't deserialize it
- if (column.isPlaceholder())
- throw new UnknownColumnException(column.ksName,
column.cfName, column.name.bytes);
if (column.isSimple())
readSimpleColumn(column, in, header, helper, builder,
rowLiveness);
else
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
index 2b5dab1..b8860fd 100644
--- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
+++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java
@@ -18,100 +18,72 @@
package org.apache.cassandra.distributed.test;
-import java.util.function.Consumer;
-
import org.junit.Test;
-import org.apache.cassandra.dht.ByteOrderedPartitioner;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
-import org.apache.cassandra.distributed.api.IInstanceConfig;
-import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
+import static org.junit.Assert.assertTrue;
public class SchemaTest extends TestBaseImpl
{
- private static final Consumer<IInstanceConfig> CONFIG_CONSUMER = config ->
{
- config.set("partitioner",
ByteOrderedPartitioner.class.getSimpleName());
- config.set("initial_token", Integer.toString(config.num() * 1000));
- };
-
@Test
- public void dropColumnMixedMode() throws Throwable
+ public void readRepair() throws Throwable
{
- try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ try (Cluster cluster = init(Cluster.build(2).start()))
{
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int
primary key, v1 int, v2 int, v3 int)");
- Object [][] someExpected = new Object[5][];
- Object [][] allExpected1 = new Object[5][];
- Object [][] allExpected2 = new Object[5][];
- for (int i = 0; i < 5; i++)
- {
- int v1 = i * 10, v2 = i * 100, v3 = i * 1000;
- cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl
(id, v1, v2, v3) VALUES (?,?,?, ?)" , ConsistencyLevel.ALL, i, v1, v2, v3);
- someExpected[i] = new Object[] {i, v1};
- allExpected1[i] = new Object[] {i, v1, v3};
- allExpected2[i] = new Object[] {i, v1, v2, v3};
- }
- cluster.forEach((instance) -> instance.flush(KEYSPACE));
- cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl
DROP v2");
- assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(1).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
- assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(2).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, primary key (pk, ck))");
+ String name = "aaa";
+ cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE +
".tbl ADD " + name + " list<int>");
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v1, v2) values (?,1,1,1)", 1);
+ selectSilent(cluster, name);
+
+ cluster.get(2).flush(KEYSPACE);
+ cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE +
".tbl ADD " + name + " list<int>");
+ cluster.get(2).shutdown();
+ cluster.get(2).startup();
+ cluster.get(2).forceCompact(KEYSPACE, "tbl");
}
}
@Test
- public void addColumnMixedMode() throws Throwable
+ public void readRepairWithCompaction() throws Throwable
{
- try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ try (Cluster cluster = init(Cluster.build(2).start()))
{
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int
primary key, v1 int, v2 int)");
- Object [][] someExpected = new Object[5][];
- Object [][] allExpected1 = new Object[5][];
- Object [][] allExpected2 = new Object[5][];
- for (int i = 0; i < 5; i++)
- {
- int v1 = i * 10, v2 = i * 100;
- cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl
(id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
- someExpected[i] = new Object[] {i, v1};
- allExpected1[i] = new Object[] {i, v1, v2, null};
- allExpected2[i] = new Object[] {i, v1, v2};
- }
- cluster.forEach((instance) -> instance.flush(KEYSPACE));
- cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl
ADD v3 int");
- assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(1).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
- assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(2).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int,
ck int, v1 int, v2 int, primary key (pk, ck))");
+ String name = "v10";
+ cluster.get(1).schemaChangeInternal("ALTER TABLE " + KEYSPACE +
".tbl ADD " + name + " list<int>");
+ cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v1, v2) values (?,1,1,1)", 1);
+ selectSilent(cluster, name);
+ cluster.get(2).flush(KEYSPACE);
+ cluster.get(2).schemaChangeInternal("ALTER TABLE " + KEYSPACE +
".tbl ADD " + name + " list<int>");
+ cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v1, v2, " + name + ") values (?,1,1,1,[1])", 1);
+ cluster.get(2).flush(KEYSPACE);
+ cluster.get(2).forceCompact(KEYSPACE, "tbl");
+ cluster.get(2).shutdown();
+ cluster.get(2).startup();
+ cluster.get(2).forceCompact(KEYSPACE, "tbl");
}
}
- @Test
- public void addDropColumnMixedMode() throws Throwable
+ private void selectSilent(Cluster cluster, String name)
{
- try (Cluster cluster = init(Cluster.create(2, CONFIG_CONSUMER)))
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace("SELECT * FROM %s.tbl
WHERE pk = ?"), ConsistencyLevel.ALL, 1);
+ }
+ catch (Exception e)
{
- cluster.schemaChange("CREATE TABLE "+KEYSPACE+".tbl (id int
primary key, v1 int, v2 int)");
- Object [][] someExpected = new Object[5][];
- Object [][] allExpected1 = new Object[5][];
- Object [][] allExpected2 = new Object[5][];
- for (int i = 0; i < 5; i++)
+ boolean causeIsUnknownColumn = false;
+ Throwable cause = e;
+ while (cause != null)
{
- int v1 = i * 10, v2 = i * 100;
- cluster.coordinator(1).execute("INSERT INTO "+KEYSPACE+".tbl
(id, v1, v2) VALUES (?,?,?)" , ConsistencyLevel.ALL, i, v1, v2);
- someExpected[i] = new Object[] {i, v1};
- allExpected1[i] = new Object[] {i, v1, v2, null};
- allExpected2[i] = new Object[] {i, v1};
+ if (cause.getMessage() != null &&
cause.getMessage().contains("Unknown column "+name+" during deserialization"))
+ causeIsUnknownColumn = true;
+ cause = cause.getCause();
}
- cluster.forEach((instance) -> instance.flush(KEYSPACE));
- cluster.get(1).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl
ADD v3 int");
- cluster.get(2).schemaChangeInternal("ALTER TABLE "+KEYSPACE+".tbl
DROP v2");
- assertRows(cluster.coordinator(1).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(1).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected1);
- assertRows(cluster.coordinator(2).execute("SELECT id, v1 FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), someExpected);
- assertRows(cluster.coordinator(2).execute("SELECT * FROM
"+KEYSPACE+".tbl", ConsistencyLevel.ALL), allExpected2);
+ assertTrue(causeIsUnknownColumn);
}
}
}
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
index 4f205f4..3e8b76b 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java
@@ -110,9 +110,6 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
row(1, 1, 1));
}
- /**
- * If a node receives a mutation for a column it's not aware of, it should
fail, since it can't write the data.
- */
@Test
public void writeWithSchemaDisagreement() throws Throwable
{
@@ -129,7 +126,7 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
try
{
cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl
(pk, ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
+ ConsistencyLevel.QUORUM);
}
catch (RuntimeException e)
{
@@ -137,64 +134,9 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
}
Assert.assertTrue(thrown.getMessage().contains("Exception occurred on
node"));
+
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
column v2 during deserialization"));
}
- /**
- * If a node receives a mutation for a column it knows has been dropped,
the write should succeed
- */
- @Test
- public void writeWithSchemaDisagreement2() throws Throwable
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck
int, v1 int, v2 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1, v2) VALUES (1, 1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1, v2) VALUES (1, 1, 1, 1)");
-
- for (int i=0; i<cluster.size(); i++)
- cluster.get(i+1).flush(KEYSPACE);;
-
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1);
-
- // execute a write including the dropped column where the coordinator
is not yet aware of the drop
- // all nodes should process this without error
- cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1, v2) VALUES (2, 2, 2, 2)",
- ConsistencyLevel.ALL);
- // and flushing should also be fine
- for (int i=0; i<cluster.size(); i++)
- cluster.get(i+1).flush(KEYSPACE);;
- // the results of reads will vary depending on whether the coordinator
has seen the schema change
- // note: read repairs will propagate the v2 value to node1, but this
is safe and handled correctly
- assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE
+ ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1,1), row(2,2,2,2)));
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE
+ ".tbl", ConsistencyLevel.ALL),
- rows(row(1,1,1), row(2,2,2)));
- }
-
- /**
- * If a node isn't aware of a column, but receives a mutation without that
column, the write should succeed
- */
- @Test
- public void writeWithInconsequentialSchemaDisagreement() throws Throwable
- {
- cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck
int, v1 int, PRIMARY KEY (pk, ck))");
-
- cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1) VALUES (1, 1, 1)");
- cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1) VALUES (1, 1, 1)");
- cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1) VALUES (1, 1, 1)");
-
- // Introduce schema disagreement
- cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
-
- // this write shouldn't cause any problems because it doesn't write to
the new column
- cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk,
ck, v1) VALUES (2, 2, 2)",
- ConsistencyLevel.ALL);
- }
-
- /**
- * If a node receives a read for a column it's not aware of, it shouldn't
complain, since it won't have any data for that column
- */
@Test
public void readWithSchemaDisagreement() throws Throwable
{
@@ -207,8 +149,20 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
// Introduce schema disagreement
cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1);
- Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}};
- assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE
+ ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected);
+ Exception thrown = null;
+ try
+ {
+ assertRows(cluster.coordinator(1).execute("SELECT * FROM " +
KEYSPACE + ".tbl WHERE pk = 1",
+ ConsistencyLevel.ALL),
+ row(1, 1, 1, null));
+ }
+ catch (Exception e)
+ {
+ thrown = e;
+ }
+
+ Assert.assertTrue(thrown.getMessage().contains("Exception occurred on
node"));
+
Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown
column v2 during deserialization"));
}
@Test
@@ -438,12 +392,4 @@ public class SimpleReadWriteTest extends
SharedClusterTestBase
{
return instance.callOnInstance(() ->
Keyspace.open(KEYSPACE).getColumnFamilyStore("tbl").metric.readLatency.latency.getCount());
}
-
- private static Object[][] rows(Object[]...rows)
- {
- Object[][] r = new Object[rows.length][];
- System.arraycopy(rows, 0, r, 0, rows.length);
- return r;
- }
-
}
diff --git a/test/unit/org/apache/cassandra/db/ColumnsTest.java
b/test/unit/org/apache/cassandra/db/ColumnsTest.java
index 444e79a..9498e8b 100644
--- a/test/unit/org/apache/cassandra/db/ColumnsTest.java
+++ b/test/unit/org/apache/cassandra/db/ColumnsTest.java
@@ -132,7 +132,7 @@ public class ColumnsTest
{
Columns.serializer.serialize(columns, out);
Assert.assertEquals(Columns.serializer.serializedSize(columns),
out.buffer().remaining());
- Columns deserialized = Columns.serializer.deserializeRegulars(new
DataInputBuffer(out.buffer(), false), mock(columns));
+ Columns deserialized = Columns.serializer.deserialize(new
DataInputBuffer(out.buffer(), false), mock(columns));
Assert.assertEquals(columns, deserialized);
Assert.assertEquals(columns.hashCode(), deserialized.hashCode());
assertContents(deserialized, definitions);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]