This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 6301bea273fb9cd1afdca3842ec68f19f13b17b6 Merge: 91c12bd 1d87da3 Author: Marcus Eriksson <[email protected]> AuthorDate: Mon Jun 21 15:46:51 2021 +0200 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../apache/cassandra/config/ColumnDefinition.java | 23 ----- src/java/org/apache/cassandra/db/Columns.java | 18 +--- .../apache/cassandra/db/SerializationHeader.java | 17 +--- .../cassandra/db/UnknownColumnException.java | 12 +-- .../apache/cassandra/db/filter/ColumnFilter.java | 8 +- .../cassandra/db/partitions/PartitionUpdate.java | 7 -- .../cassandra/db/rows/UnfilteredSerializer.java | 16 +-- .../cassandra/distributed/test/SchemaTest.java | 113 ++++++++------------- .../distributed/test/SimpleReadWriteTest.java | 50 --------- test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +- 11 files changed, 58 insertions(+), 209 deletions(-) diff --cc CHANGES.txt index 34a5973,91d85be..83fc711 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,13 -1,5 +1,14 @@@ -3.0.25: +3.11.11 + * Fix LeveledCompactionStrategy compacts last level throw an ArrayIndexOutOfBoundsException (CASSANDRA-15669) + * Maps $CASSANDRA_LOG_DIR to cassandra.logdir java property when executing nodetool (CASSANDRA-16199) + * Nodetool garbagecollect should retain SSTableLevel for LCS (CASSANDRA-16634) + * Ignore stale acks received in the shadow round (CASSANDRA-16588) + * Add autocomplete and error messages for provide_overlapping_tombstones (CASSANDRA-16350) + * Add StorageServiceMBean.getKeyspaceReplicationInfo(keyspaceName) (CASSANDRA-16447) + * Make sure sstables with moved starts are removed correctly in LeveledGenerations (CASSANDRA-16552) + * Upgrade jackson-databind to 2.9.10.8 (CASSANDRA-16462) +Merged from 3.0: + * Adding columns via ALTER TABLE can generate corrupt sstables (CASSANDRA-16735) * Add flag to disable ALTER...DROP COMPACT STORAGE statements (CASSANDRA-16733) * Clean transaction log leftovers at the beginning of sstablelevelreset and sstableofflinerelevel (CASSANDRA-12519) * CQL shell should prefer newer TLS version by default (CASSANDRA-16695) diff --cc src/java/org/apache/cassandra/db/Columns.java index 512b695,18e17d7..8efb848 --- a/src/java/org/apache/cassandra/db/Columns.java +++ b/src/java/org/apache/cassandra/db/Columns.java @@@ -460,13 -441,8 +460,9 @@@ public class Columns extends AbstractCo // fail deserialization because of that. So we grab a "fake" ColumnDefinition that ensure proper // deserialization. The column will be ignore later on anyway. column = metadata.getDroppedColumnDefinition(name); + - // If there's no dropped column, it may be for a column we haven't received a schema update for yet - // so we create a placeholder column. If this is a read, the placeholder column will let the response - // serializer know we're not serializing all requested columns when it writes the row flags, but it - // will cause mutations that try to write values for this column to fail. if (column == null) - column = ColumnDefinition.placeholder(metadata, name, isStatic); + throw new RuntimeException("Unknown column " + UTF8Type.instance.getString(name) + " during deserialization"); } builder.add(column); } diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java index dfbce4a,c658c12..d7b4c81 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@@ -509,11 -406,11 +509,11 @@@ public class ColumnFilte } } - if (hasSelection) + if (hasQueried) { - Columns statics = Columns.serializer.deserializeStatics(in, metadata); - Columns regulars = Columns.serializer.deserializeRegulars(in, metadata); + Columns statics = Columns.serializer.deserialize(in, metadata); + Columns regulars = Columns.serializer.deserialize(in, metadata); - selection = new PartitionColumns(statics, regulars); + queried = new PartitionColumns(statics, regulars); } SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null; diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 0890611,0342e39..c81ac9d --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@@ -19,18 -19,13 +19,17 @@@ package org.apache.cassandra.db.rows import java.io.IOException; -import com.google.common.collect.Collections2; +import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.config.ColumnDefinition; - import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row.Deletion; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.WrappedException; /** * Serialize/deserialize a single Unfiltered (both on-wire and on-disk). @@@ -156,7 -133,7 +155,7 @@@ public class UnfilteredSerialize LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); boolean hasComplexDeletion = row.hasComplexDeletion(); - boolean hasAllColumns = header.hasAllColumns(row, isStatic); - boolean hasAllColumns = (row.columnCount() == headerColumns.size()); ++ boolean hasAllColumns = row.columnCount() == headerColumns.size(); boolean hasExtendedFlags = hasExtendedFlags(row); if (isStatic) @@@ -230,42 -184,20 +229,37 @@@ Columns.serializer.serializeSubset(row.columns(), headerColumns, out); SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator(); - for (ColumnData data : row) + + try { - // We can obtain the column for data directly from data.column(). However, if the cell/complex data - // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, - // and if that type have been recently altered, that may not be the type we want to serialize the column - // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what - // happens if we don't do that. - ColumnDefinition column = si.next(data.column()); - assert column != null; + row.apply(cd -> { + // We can obtain the column for data directly from data.column(). However, if the cell/complex data + // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, + // and if that type have been recently altered, that may not be the type we want to serialize the column + // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what + // happens if we don't do that. + ColumnDefinition column = si.next(cd.column()); - - // we may have columns that the remote node isn't aware of due to inflight schema changes - // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only - // expect a subset of columns (from this node's perspective). See CASSANDRA-15899 - if (column == null) - return; ++ assert column != null; + + try + { + if (cd.column.isSimple()) + Cell.serializer.serialize((Cell) cd, column, out, pkLiveness, header); + else + writeComplexColumn((ComplexColumnData) cd, column, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); - if (data.column.isSimple()) - Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header); - else - writeComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header, out); + throw e; } } @@@ -342,7 -274,7 +336,7 @@@ LivenessInfo pkLiveness = row.primaryKeyLivenessInfo(); Row.Deletion deletion = row.deletion(); boolean hasComplexDeletion = row.hasComplexDeletion(); - boolean hasAllColumns = header.hasAllColumns(row, isStatic); - boolean hasAllColumns = (row.columnCount() == headerColumns.size()); ++ boolean hasAllColumns = row.columnCount() == headerColumns.size(); if (!pkLiveness.isEmpty()) size += header.timestampSerializedSize(pkLiveness.timestamp()); @@@ -603,35 -482,12 +597,31 @@@ builder.addRowDeletion(hasDeletion ? new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : Row.Deletion.LIVE); Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in); - for (ColumnDefinition column : columns) + + final LivenessInfo livenessInfo = rowLiveness; + + try { - if (column.isSimple()) - readSimpleColumn(column, in, header, helper, builder, rowLiveness); - else - readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness); + columns.apply(column -> { + try + { - // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it - if (column.isPlaceholder()) - throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes); - + if (column.isSimple()) + readSimpleColumn(column, in, header, helper, builder, livenessInfo); + else + readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, livenessInfo); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); + + throw e; } return builder.build(); diff --cc test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java index 2b5dab1,b8860fd..8f7b47f --- a/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SchemaTest.java @@@ -18,16 -18,12 +18,11 @@@ package org.apache.cassandra.distributed.test; - import java.util.function.Consumer; - import org.junit.Test; - import org.apache.cassandra.dht.ByteOrderedPartitioner; import org.apache.cassandra.distributed.Cluster; import org.apache.cassandra.distributed.api.ConsistencyLevel; - import org.apache.cassandra.distributed.api.IInstanceConfig; -- - import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows; + import static org.junit.Assert.assertTrue; public class SchemaTest extends TestBaseImpl { diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 84a2307,3e8b76b..b7cef5f --- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@@ -137,47 -134,11 +137,16 @@@ public class SimpleReadWriteTest extend } Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); - Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); + Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2")); } - /** - * If a node receives a mutation for a column it knows has been dropped, the write should succeed - */ - @Test - public void writeWithSchemaDisagreement2() throws Throwable - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); - - for (int i=0; i<cluster.size(); i++) - cluster.get(i+1).flush(KEYSPACE);; - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1); - - // execute a write including the dropped column where the coordinator is not yet aware of the drop - // all nodes should process this without error - cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", - ConsistencyLevel.ALL); - // and flushing should also be fine - for (int i=0; i<cluster.size(); i++) - cluster.get(i+1).flush(KEYSPACE);; - // the results of reads will vary depending on whether the coordinator has seen the schema change - // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly - assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), - rows(row(1,1,1,1), row(2,2,2,2))); - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), - rows(row(1,1,1), row(2,2,2))); - } + + /** + * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed + */ @Test - public void readWithSchemaDisagreement() throws Throwable + public void writeWithInconsequentialSchemaDisagreement() throws Throwable { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); @@@ -188,30 -149,22 +157,11 @@@ // Introduce schema disagreement cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - Exception thrown = null; - try - { - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", - ConsistencyLevel.ALL), - row(1, 1, 1, null)); - } - catch (Exception e) - { - thrown = e; - } - - Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); - Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); + // this write shouldn't cause any problems because it doesn't write to the new column + cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (2, 2, 2)", + ConsistencyLevel.ALL); } - /** - * If a node receives a read for a column it's not aware of, it shouldn't complain, since it won't have any data for that column - */ - @Test - public void readWithSchemaDisagreement() throws Throwable - { - cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); - - cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1) VALUES (1, 1, 1)"); - - // Introduce schema disagreement - cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl ADD v2 int", 1); - - Object[][] expected = new Object[][]{new Object[]{1, 1, 1, null}}; - assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl WHERE pk = 1", ConsistencyLevel.ALL), expected); - } - @Test public void simplePagedReadsTest() throws Throwable { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
