This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cassandra-3.11 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9bf1ab1a6a8393a1e5cc67041f6e85dd9065b9f4 Merge: 3f73c16 31b9078 Author: Blake Eggleston <bdeggles...@gmail.com> AuthorDate: Mon Oct 5 14:20:16 2020 -0700 Merge branch 'cassandra-3.0' into cassandra-3.11 CHANGES.txt | 1 + .../apache/cassandra/config/ColumnDefinition.java | 23 ++++ src/java/org/apache/cassandra/db/Columns.java | 19 +++- .../apache/cassandra/db/SerializationHeader.java | 17 ++- .../cassandra/db/UnknownColumnException.java | 12 ++- .../apache/cassandra/db/filter/ColumnFilter.java | 8 +- .../cassandra/db/partitions/PartitionUpdate.java | 7 ++ .../cassandra/db/rows/UnfilteredSerializer.java | 20 +++- .../cassandra/distributed/test/SchemaTest.java | 117 +++++++++++++++++++++ .../distributed/test/SimpleReadWriteTest.java | 87 ++++++++++++--- test/unit/org/apache/cassandra/db/ColumnsTest.java | 2 +- 11 files changed, 280 insertions(+), 33 deletions(-) diff --cc CHANGES.txt index b735ba5,1ea5184..99369fa --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,17 -1,10 +1,18 @@@ -3.0.23: - * Handle unexpected columns due to schema races (CASSANDRA-15899) +3.11.9 + * Fix memory leak in CompressedChunkReader (CASSANDRA-15880) + * Don't attempt value skipping with mixed version cluster (CASSANDRA-15833) * Avoid failing compactions with very large partitions (CASSANDRA-15164) - * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) + * Make sure LCS handles duplicate sstable added/removed notifications correctly (CASSANDRA-14103) +Merged from 3.0: ++ * Handle unexpected columns due to schema races (CASSANDRA-15899) * Add flag to ignore unreplicated keyspaces during repair (CASSANDRA-15160) -3.0.22: +3.11.8 + * Correctly interpret SASI's `max_compaction_flush_memory_in_mb` setting in megabytes not bytes (CASSANDRA-16071) + * Fix short read protection for GROUP BY queries (CASSANDRA-15459) + * Frozen RawTuple is not annotated with frozen in the toString method (CASSANDRA-15857) +Merged from 3.0: + * Use IF NOT EXISTS for index and UDT create statements in snapshot schema files (CASSANDRA-13935) * Fix gossip shutdown order (CASSANDRA-15816) * Remove broken 'defrag-on-read' optimization (CASSANDRA-15432) * Check for endpoint collision with hibernating nodes (CASSANDRA-14599) diff --cc src/java/org/apache/cassandra/db/filter/ColumnFilter.java index 57ff729,858c944..3c79539 --- a/src/java/org/apache/cassandra/db/filter/ColumnFilter.java +++ b/src/java/org/apache/cassandra/db/filter/ColumnFilter.java @@@ -481,11 -441,11 +481,11 @@@ public class ColumnFilte } } - if (hasSelection) + if (hasQueried) { - Columns statics = Columns.serializer.deserialize(in, metadata); - Columns regulars = Columns.serializer.deserialize(in, metadata); + Columns statics = Columns.serializer.deserializeStatics(in, metadata); + Columns regulars = Columns.serializer.deserializeRegulars(in, metadata); - selection = new PartitionColumns(statics, regulars); + queried = new PartitionColumns(statics, regulars); } SortedSetMultimap<ColumnIdentifier, ColumnSubselection> subSelections = null; diff --cc src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java index 926f3ef,9e11f94..0890611 --- a/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java +++ b/src/java/org/apache/cassandra/db/rows/UnfilteredSerializer.java @@@ -19,18 -19,13 +19,18 @@@ package org.apache.cassandra.db.rows import java.io.IOException; - import com.google.common.collect.Collections2; +import net.nicoulaj.compilecommand.annotations.Inline; import org.apache.cassandra.config.ColumnDefinition; + import org.apache.cassandra.db.marshal.UTF8Type; import org.apache.cassandra.db.*; +import org.apache.cassandra.db.rows.Row.Deletion; import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.io.util.FileDataInput; import org.apache.cassandra.utils.SearchIterator; +import org.apache.cassandra.utils.WrappedException; /** * Serialize/deserialize a single Unfiltered (both on-wire and on-disk). @@@ -230,37 -184,25 +230,42 @@@ public class UnfilteredSerialize Columns.serializer.serializeSubset(row.columns(), headerColumns, out); SearchIterator<ColumnDefinition, ColumnDefinition> si = headerColumns.iterator(); - for (ColumnData data : row) - { - // We can obtain the column for data directly from data.column(). However, if the cell/complex data - // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, - // and if that type have been recently altered, that may not be the type we want to serialize the column - // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what - // happens if we don't do that. - ColumnDefinition column = si.next(data.column()); - // we may have columns that the remote node isn't aware of due to inflight schema changes - // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only - // expect a subset of columns (from this node's perspective). See CASSANDRA-15899 - if (column == null) - continue; + try + { + row.apply(cd -> { + // We can obtain the column for data directly from data.column(). However, if the cell/complex data + // originates from a sstable, the column we'll get will have the type used when the sstable was serialized, + // and if that type have been recently altered, that may not be the type we want to serialize the column + // with. So we use the ColumnDefinition from the "header" which is "current". Also see #11810 for what + // happens if we don't do that. + ColumnDefinition column = si.next(cd.column()); - assert column != null : cd.column.toString(); ++ ++ // we may have columns that the remote node isn't aware of due to inflight schema changes ++ // in cases where it tries to fetch all columns, it will set the `all columns` flag, but only ++ // expect a subset of columns (from this node's perspective). See CASSANDRA-15899 ++ if (column == null) ++ return; + + try + { + if (cd.column.isSimple()) + Cell.serializer.serialize((Cell) cd, column, out, pkLiveness, header); + else + writeComplexColumn((ComplexColumnData) cd, column, (flags & HAS_COMPLEX_DELETION) != 0, pkLiveness, header, out); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); - if (data.column.isSimple()) - Cell.serializer.serialize((Cell) data, column, out, pkLiveness, header); - else - writeComplexColumn((ComplexColumnData) data, column, hasComplexDeletion, pkLiveness, header, out); + throw e; } } @@@ -597,31 -488,15 +603,35 @@@ builder.addRowDeletion(hasDeletion ? new Row.Deletion(header.readDeletionTime(in), deletionIsShadowable) : Row.Deletion.LIVE); Columns columns = hasAllColumns ? headerColumns : Columns.serializer.deserializeSubset(headerColumns, in); - for (ColumnDefinition column : columns) + + final LivenessInfo livenessInfo = rowLiveness; + + try { - // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it - if (column.isPlaceholder()) - throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes); - if (column.isSimple()) - readSimpleColumn(column, in, header, helper, builder, rowLiveness); - else - readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, rowLiveness); + columns.apply(column -> { + try + { ++ // if the column is a placeholder, then it's not part of our schema, and we can't deserialize it ++ if (column.isPlaceholder()) ++ throw new UnknownColumnException(column.ksName, column.cfName, column.name.bytes); ++ + if (column.isSimple()) + readSimpleColumn(column, in, header, helper, builder, livenessInfo); + else + readComplexColumn(column, in, header, helper, hasComplexDeletion, builder, livenessInfo); + } + catch (IOException e) + { + throw new WrappedException(e); + } + }, false); + } + catch (WrappedException e) + { + if (e.getCause() instanceof IOException) + throw (IOException) e.getCause(); + + throw e; } return builder.build(); diff --cc test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java index 75e5ba9,17064fa..05f3458 --- a/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/SimpleReadWriteTest.java @@@ -116,11 -119,46 +119,47 @@@ public class SimpleReadWriteTest extend } Assert.assertTrue(thrown.getMessage().contains("Exception occurred on node")); - Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2 during deserialization")); ++ Assert.assertTrue(thrown.getCause().getCause().getCause().getMessage().contains("Unknown column v2")); } + /** + * If a node receives a mutation for a column it knows has been dropped, the write should succeed + */ @Test - public void readWithSchemaDisagreement() throws Throwable + public void writeWithSchemaDisagreement2() throws Throwable + { + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, v2 int, PRIMARY KEY (pk, ck))"); + + cluster.get(1).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + cluster.get(2).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + cluster.get(3).executeInternal("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (1, 1, 1, 1)"); + + for (int i=0; i<cluster.size(); i++) + cluster.get(i+1).flush(KEYSPACE);; + + // Introduce schema disagreement + cluster.schemaChange("ALTER TABLE " + KEYSPACE + ".tbl DROP v2", 1); + + // execute a write including the dropped column where the coordinator is not yet aware of the drop + // all nodes should process this without error + cluster.coordinator(2).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v1, v2) VALUES (2, 2, 2, 2)", + ConsistencyLevel.ALL); + // and flushing should also be fine + for (int i=0; i<cluster.size(); i++) + cluster.get(i+1).flush(KEYSPACE);; + // the results of reads will vary depending on whether the coordinator has seen the schema change + // note: read repairs will propagate the v2 value to node1, but this is safe and handled correctly + assertRows(cluster.coordinator(2).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), + rows(row(1,1,1,1), row(2,2,2,2))); + assertRows(cluster.coordinator(1).execute("SELECT * FROM " + KEYSPACE + ".tbl", ConsistencyLevel.ALL), + rows(row(1,1,1), row(2,2,2))); + } + + /** + * If a node isn't aware of a column, but receives a mutation without that column, the write should succeed + */ + @Test + public void writeWithInconsequentialSchemaDisagreement() throws Throwable { cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v1 int, PRIMARY KEY (pk, ck))"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org