Repository: cassandra Updated Branches: refs/heads/cassandra-3.0 362e13206 -> e4c344c58 refs/heads/cassandra-3.9 c7547e0de -> 1f014b2ca refs/heads/trunk e42352763 -> 9242c85cf
Avoid deserialization error after altering column type This makes sure the column used when serializing intra-node messages is "current". Previously, we would use the type used during deserialization which could not be "current" due to an ALTER TYPE. patch by slebresne; reviewed by thobbs for CASSANDRA-11820 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/e4c344c5 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/e4c344c5 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/e4c344c5 Branch: refs/heads/cassandra-3.0 Commit: e4c344c58f8ca8f69224855080de4ec266fb671e Parents: 362e132 Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Mon Jun 27 14:17:27 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Jun 30 11:14:01 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../apache/cassandra/db/rows/BufferCell.java | 16 +++---- src/java/org/apache/cassandra/db/rows/Cell.java | 4 +- .../cassandra/db/rows/UnfilteredSerializer.java | 30 +++++++++---- .../org/apache/cassandra/cql3/CQLTester.java | 39 +++++++++++++++-- .../cql3/validation/operations/AlterTest.java | 46 +++++++++++++------- 6 files changed, 98 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index ae37d2c..573f704 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.9 + * Fix EOF exception when altering column type (CASSANDRA-11820) Merged from 2.2: * MemoryUtil.getShort() should return an unsigned short also for architectures not supporting unaligned memory accesses (CASSANDRA-11973) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 22b629a..db0ded5 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -228,7 +228,7 @@ public class BufferCell extends AbstractCell private final static int USE_ROW_TIMESTAMP_MASK = 0x08; // Wether the cell has the same timestamp than the row this is a cell of. private final static int USE_ROW_TTL_MASK = 0x10; // Wether the cell has the same ttl than the row this is a cell of. - public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException + public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException { assert cell != null; boolean hasValue = cell.value().hasRemaining(); @@ -260,11 +260,11 @@ public class BufferCell extends AbstractCell if (isExpiring && !useRowTTL) header.writeTTL(cell.ttl(), out); - if (cell.column().isComplex()) - cell.column().cellPathSerializer().serialize(cell.path(), out); + if (column.isComplex()) + column.cellPathSerializer().serialize(cell.path(), out); if (hasValue) - header.getType(cell.column()).writeValue(cell.value(), out); + header.getType(column).writeValue(cell.value(), out); } public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException @@ -308,7 +308,7 @@ public class BufferCell extends AbstractCell return new BufferCell(column, timestamp, ttl, localDeletionTime, value, path); } - public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header) + public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header) { long size = 1; // flags boolean hasValue = cell.value().hasRemaining(); @@ -325,11 +325,11 @@ public class BufferCell extends AbstractCell if (isExpiring && !useRowTTL) size += header.ttlSerializedSize(cell.ttl()); - if (cell.column().isComplex()) - size += cell.column().cellPathSerializer().serializedSize(cell.path()); + if (column.isComplex()) + size += column.cellPathSerializer().serializedSize(cell.path()); if (hasValue) - size += header.getType(cell.column()).writtenLength(cell.value()); + size += header.getType(column).writtenLength(cell.value()); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/Cell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/Cell.java b/src/java/org/apache/cassandra/db/rows/Cell.java index b10ce06..d10cc74 100644 --- a/src/java/org/apache/cassandra/db/rows/Cell.java +++ b/src/java/org/apache/cassandra/db/rows/Cell.java @@ -145,11 +145,11 @@ public abstract class Cell extends ColumnData public interface Serializer { - public void serialize(Cell cell, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException; + public void serialize(Cell cell, ColumnDefinition column, DataOutputPlus out, LivenessInfo rowLiveness, SerializationHeader header) throws IOException; public Cell deserialize(DataInputPlus in, LivenessInfo rowLiveness, ColumnDefinition column, SerializationHeader header, SerializationHelper helper) throws IOException; - public long serializedSize(Cell cell, LivenessInfo rowLiveness, SerializationHeader header); + public long serializedSize(Cell cell, ColumnDefinition column, LivenessInfo rowLiveness, SerializationHeader header); // Returns if the skipped cell was an actual cell (i.e. it had its presence flag). public boolean skip(DataInputPlus in, ColumnDefinition column, SerializationHeader header) throws IOException; http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index e4202c9..dc6f187 100644 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@ -25,6 +25,7 @@ import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.db.*; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.utils.SearchIterator; /** * Serialize/deserialize a single Unfiltered (both on-wire and on-disk). @@ -177,16 +178,25 @@ public class UnfilteredSerializer if (!hasAllColumns) Columns.serializer.serializeSubset(Collections2.transform(row, ColumnData::column), headerColumns, out); + SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator(); for (ColumnData data : row) { + // We can obtain the column for data directly from data.column(). However, if the cell/complex data + // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, + // and if that type have been recently altered, that may not be the type we want to serialize the column + // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what + // happens if we don't do that. + ColumnDefinition column = si.next(data.column()); + assert column != null; + if (data.column.isSimple()) - Cell.serializer.serialize((Cell) data, out, pkLiveness, header); + Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header); else - writeComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header, out); + writeComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header, out); } } - private void writeComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out) + private void writeComplexColumn(ComplexColumnData data, ColumnDefinition column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header, DataOutputPlus out) throws IOException { if (hasComplexDeletion) @@ -194,7 +204,7 @@ public class UnfilteredSerializer out.writeUnsignedVInt(data.cellsCount()); for (Cell cell : data) - Cell.serializer.serialize(cell, out, rowLiveness, header); + Cell.serializer.serialize(cell, column, out, rowLiveness, header); } private void serialize(RangeTombstoneMarker marker, SerializationHeader header, DataOutputPlus out, long previousUnfilteredSize, int version) @@ -274,18 +284,22 @@ public class UnfilteredSerializer if (!hasAllColumns) size += Columns.serializer.serializedSubsetSize(Collections2.transform(row, ColumnData::column), header.columns(isStatic)); + SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator(); for (ColumnData data : row) { + ColumnDefinition column = si.next(data.column()); + assert column != null; + if (data.column.isSimple()) - size += Cell.serializer.serializedSize((Cell) data, pkLiveness, header); + size += Cell.serializer.serializedSize((Cell) data, column, pkLiveness, header); else - size += sizeOfComplexColumn((ComplexColumnData) data, hasComplexDeletion, pkLiveness, header); + size += sizeOfComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header); } return size; } - private long sizeOfComplexColumn(ComplexColumnData data, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header) + private long sizeOfComplexColumn(ComplexColumnData data, ColumnDefinition column, boolean hasComplexDeletion, LivenessInfo rowLiveness, SerializationHeader header) { long size = 0; @@ -294,7 +308,7 @@ public class UnfilteredSerializer size += TypeSizes.sizeofUnsignedVInt(data.cellsCount()); for (Cell cell : data) - size += Cell.serializer.serializedSize(cell, rowLiveness, header); + size += Cell.serializer.serializedSize(cell, column, rowLiveness, header); return size; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index fe03db4..a213edf 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -131,6 +131,7 @@ public abstract class CQLTester public static ResultMessage lastSchemaChangeResult; + private List<String> keyspaces = new ArrayList<>(); private List<String> tables = new ArrayList<>(); private List<String> types = new ArrayList<>(); private List<String> functions = new ArrayList<>(); @@ -262,10 +263,12 @@ public abstract class CQLTester usePrepared = USE_PREPARED_VALUES; reusePrepared = REUSE_PREPARED; + final List<String> keyspacesToDrop = copy(keyspaces); final List<String> tablesToDrop = copy(tables); final List<String> typesToDrop = copy(types); final List<String> functionsToDrop = copy(functions); final List<String> aggregatesToDrop = copy(aggregates); + keyspaces = null; tables = null; types = null; functions = null; @@ -290,6 +293,9 @@ public abstract class CQLTester for (int i = typesToDrop.size() - 1; i >= 0; i--) schemaChange(String.format("DROP TYPE IF EXISTS %s.%s", KEYSPACE, typesToDrop.get(i))); + for (int i = keyspacesToDrop.size() - 1; i >= 0; i--) + schemaChange(String.format("DROP KEYSPACE IF EXISTS %s", keyspacesToDrop.get(i))); + // Dropping doesn't delete the sstables. It's not a huge deal but it's cleaner to cleanup after us // Thas said, we shouldn't delete blindly before the TransactionLogs.SSTableTidier for the table we drop // have run or they will be unhappy. Since those taks are scheduled on StorageService.tasks and that's @@ -501,6 +507,22 @@ public abstract class CQLTester schemaChange(fullQuery); } + protected String createKeyspace(String query) + { + String currentKeyspace = createKeyspaceName(); + String fullQuery = String.format(query, currentKeyspace); + logger.info(fullQuery); + schemaChange(fullQuery); + return currentKeyspace; + } + + protected String createKeyspaceName() + { + String currentKeyspace = "keyspace_" + seqNumber.getAndIncrement(); + keyspaces.add(currentKeyspace); + return currentKeyspace; + } + protected String createTable(String query) { String currentTable = createTableName(); @@ -519,8 +541,7 @@ public abstract class CQLTester protected void createTableMayThrow(String query) throws Throwable { - String currentTable = "table_" + seqNumber.getAndIncrement(); - tables.add(currentTable); + String currentTable = createTableName(); String fullQuery = formatQuery(query); logger.info(fullQuery); QueryProcessor.executeOnceInternal(fullQuery); @@ -825,6 +846,16 @@ public abstract class CQLTester */ public static void assertRowsIgnoringOrder(UntypedResultSet result, Object[]... rows) { + assertRowsIgnoringOrderInternal(result, false, rows); + } + + public static void assertRowsIgnoringOrderAndExtra(UntypedResultSet result, Object[]... rows) + { + assertRowsIgnoringOrderInternal(result, true, rows); + } + + private static void assertRowsIgnoringOrderInternal(UntypedResultSet result, boolean ignoreExtra, Object[]... rows) + { if (result == null) { if (rows.length > 0) @@ -855,7 +886,7 @@ public abstract class CQLTester com.google.common.collect.Sets.SetView<List<ByteBuffer>> extra = com.google.common.collect.Sets.difference(actualRows, expectedRows); com.google.common.collect.Sets.SetView<List<ByteBuffer>> missing = com.google.common.collect.Sets.difference(expectedRows, actualRows); - if (!extra.isEmpty() || !missing.isEmpty()) + if ((!ignoreExtra && !extra.isEmpty()) || !missing.isEmpty()) { List<String> extraRows = makeRowStrings(extra, meta); List<String> missingRows = makeRowStrings(missing, meta); @@ -876,7 +907,7 @@ public abstract class CQLTester Assert.fail("Missing " + missing.size() + " row(s) in result: \n " + missingRows.stream().collect(Collectors.joining("\n "))); } - assert expectedRows.size() == actualRows.size(); + assert ignoreExtra || expectedRows.size() == actualRows.size(); } private static List<String> makeRowStrings(Iterable<List<ByteBuffer>> rows, List<ColumnSpecification> meta) http://git-wip-us.apache.org/repos/asf/cassandra/blob/e4c344c5/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java index 9f8bea2..509aeac 100644 --- a/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java +++ b/test/unit/org/apache/cassandra/cql3/validation/operations/AlterTest.java @@ -24,6 +24,8 @@ import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.SyntaxException; import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.transport.Server; +import org.apache.cassandra.utils.ByteBufferUtil; import org.junit.Assert; import org.junit.Test; @@ -130,37 +132,33 @@ public class AlterTest extends CQLTester assertInvalidThrow(SyntaxException.class, "CREATE KEYSPACE ks1"); assertInvalidThrow(ConfigurationException.class, "CREATE KEYSPACE ks1 WITH replication= { 'replication_factor' : 1 }"); - execute("CREATE KEYSPACE ks1 WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); - execute("CREATE KEYSPACE ks2 WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 } AND durable_writes=false"); + String ks1 = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"); + String ks2 = createKeyspace("CREATE KEYSPACE %s WITH replication={ 'class' : 'SimpleStrategy', 'replication_factor' : 1 } AND durable_writes=false"); - assertRows(execute("SELECT keyspace_name, durable_writes FROM system_schema.keyspaces"), - row("ks1", true), + assertRowsIgnoringOrderAndExtra(execute("SELECT keyspace_name, durable_writes FROM system_schema.keyspaces"), row(KEYSPACE, true), row(KEYSPACE_PER_TEST, true), - row("ks2", false)); + row(ks1, true), + row(ks2, false)); - execute("ALTER KEYSPACE ks1 WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND durable_writes=False"); - execute("ALTER KEYSPACE ks2 WITH durable_writes=true"); + schemaChange("ALTER KEYSPACE " + ks1 + " WITH replication = { 'class' : 'NetworkTopologyStrategy', 'dc1' : 1 } AND durable_writes=False"); + schemaChange("ALTER KEYSPACE " + ks2 + " WITH durable_writes=true"); - assertRows(execute("SELECT keyspace_name, durable_writes, replication FROM system_schema.keyspaces"), - row("ks1", false, map("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "dc1", "1")), + assertRowsIgnoringOrderAndExtra(execute("SELECT keyspace_name, durable_writes, replication FROM system_schema.keyspaces"), row(KEYSPACE, true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")), row(KEYSPACE_PER_TEST, true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1")), - row("ks2", true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1"))); + row(ks1, false, map("class", "org.apache.cassandra.locator.NetworkTopologyStrategy", "dc1", "1")), + row(ks2, true, map("class", "org.apache.cassandra.locator.SimpleStrategy", "replication_factor", "1"))); - execute("USE ks1"); + execute("USE " + ks1); assertInvalidThrow(ConfigurationException.class, "CREATE TABLE cf1 (a int PRIMARY KEY, b int) WITH compaction = { 'min_threshold' : 4 }"); execute("CREATE TABLE cf1 (a int PRIMARY KEY, b int) WITH compaction = { 'class' : 'SizeTieredCompactionStrategy', 'min_threshold' : 7 }"); - assertRows(execute("SELECT table_name, compaction FROM system_schema.tables WHERE keyspace_name='ks1'"), + assertRows(execute("SELECT table_name, compaction FROM system_schema.tables WHERE keyspace_name='" + ks1 + "'"), row("cf1", map("class", "org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy", "min_threshold", "7", "max_threshold", "32"))); - - // clean-up - execute("DROP KEYSPACE ks1"); - execute("DROP KEYSPACE ks2"); } /** @@ -324,4 +322,20 @@ public class AlterTest extends CQLTester createTable("CREATE TABLE %s (key blob, column1 blob, value blob, PRIMARY KEY ((key), column1)) WITH COMPACT STORAGE"); assertInvalidThrow(InvalidRequestException.class, "ALTER TABLE %s ALTER column1 TYPE ascii"); } + + @Test + public void testAlterToBlob() throws Throwable + { + // This tests for the bug from #11820 in particular + + createTable("CREATE TABLE %s (a int PRIMARY KEY, b int)"); + + execute("INSERT INTO %s (a, b) VALUES (1, 1)"); + + executeNet(Server.CURRENT_VERSION, "ALTER TABLE %s ALTER b TYPE BLOB"); + + assertRowsNet(Server.CURRENT_VERSION, executeNet(Server.CURRENT_VERSION, "SELECT * FROM %s WHERE a = 1"), + row(1, ByteBufferUtil.bytes(1)) + ); + } }