Repository: cassandra Updated Branches: refs/heads/cassandra-3.X 25d4c7baa -> 57e9a83b2 refs/heads/trunk 78ff37707 -> c18968b1b
cdc column addition still breaks schema migration tasks patch by Sylvain Lebresne; reviewed by Aleksey Yeschenko for CASSANDRA-12697 Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/57e9a83b Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/57e9a83b Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/57e9a83b Branch: refs/heads/cassandra-3.X Commit: 57e9a83b2abf08d7a1261e8f7a9f435d221a1f81 Parents: 25d4c7b Author: Sylvain Lebresne <sylv...@datastax.com> Authored: Fri Sep 23 11:26:22 2016 +0200 Committer: Sylvain Lebresne <sylv...@datastax.com> Committed: Thu Sep 29 17:00:07 2016 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../cassandra/db/rows/WithOnlyQueriedData.java | 7 +++++ .../cassandra/db/transform/Transformation.java | 12 +++++++++ .../cassandra/db/transform/UnfilteredRows.java | 11 ++++++++ .../apache/cassandra/schema/SchemaKeyspace.java | 28 +++++++++++++++++++- 5 files changed, 58 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a9e46f7..c33b1d3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.10 + * cdc column addition still breaks schema migration tasks (CASSANDRA-12697) * Upgrade metrics-reporter dependencies (CASSANDRA-12089) * Tune compaction thread count via nodetool (CASSANDRA-12248) * Add +=/-= shortcut syntax for update queries (CASSANDRA-12232) http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java index 0b407f2..dcf0891 100644 --- a/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java +++ b/src/java/org/apache/cassandra/db/rows/WithOnlyQueriedData.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.db.rows; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.transform.Transformation; @@ -35,6 +36,12 @@ public class WithOnlyQueriedData<I extends BaseRowIterator<?>> extends Transform } @Override + protected PartitionColumns applyToPartitionColumns(PartitionColumns columns) + { + return filter.queriedColumns(); + } + + @Override protected Row applyToStatic(Row row) { return row.withOnlyQueriedData(filter); http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/transform/Transformation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/Transformation.java b/src/java/org/apache/cassandra/db/transform/Transformation.java index 3134725..33c1fe7 100644 --- a/src/java/org/apache/cassandra/db/transform/Transformation.java +++ b/src/java/org/apache/cassandra/db/transform/Transformation.java @@ -22,6 +22,7 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.*; @@ -109,6 +110,17 @@ public abstract class Transformation<I extends BaseRowIterator<?>> return deletionTime; } + /** + * Applied to the {@code PartitionColumns} of any rows iterator. + * + * NOTE: same remark than for applyToDeletion: it is only applied to the first iterator in a sequence of iterators + * filled by MoreContents. + */ + protected PartitionColumns applyToPartitionColumns(PartitionColumns columns) + { + return columns; + } + //****************************************************** // Static Application Methods http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java index f000fcf..ba86066 100644 --- a/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java +++ b/src/java/org/apache/cassandra/db/transform/UnfilteredRows.java @@ -21,17 +21,20 @@ package org.apache.cassandra.db.transform; import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.PartitionColumns; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.Unfiltered; import org.apache.cassandra.db.rows.UnfilteredRowIterator; final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> implements UnfilteredRowIterator { + private PartitionColumns partitionColumns; private DeletionTime partitionLevelDeletion; public UnfilteredRows(UnfilteredRowIterator input) { super(input); + partitionColumns = input.columns(); partitionLevelDeletion = input.partitionLevelDeletion(); } @@ -39,9 +42,17 @@ final class UnfilteredRows extends BaseRows<Unfiltered, UnfilteredRowIterator> i void add(Transformation add) { super.add(add); + partitionColumns = add.applyToPartitionColumns(partitionColumns); partitionLevelDeletion = add.applyToDeletion(partitionLevelDeletion); } + @Override + public PartitionColumns columns() + { + return partitionColumns; + } + + @Override public DeletionTime partitionLevelDeletion() { return partitionLevelDeletion; http://git-wip-us.apache.org/repos/asf/cassandra/blob/57e9a83b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 57c0b89..36a8072 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -40,6 +40,7 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.view.View; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -369,12 +370,37 @@ public final class SchemaKeyspace mutationMap.put(key, mutation); } - mutation.add(PartitionUpdate.fromIterator(partition, cmd.columnFilter())); + mutation.add(makeUpdateForSchema(partition, cmd.columnFilter())); } } } } + /** + * Creates a PartitionUpdate from a partition containing some schema table content. + * This is mainly calling {@code PartitionUpdate.fromIterator} except for the fact that it deals with + * the problem described in #12236. + */ + private static PartitionUpdate makeUpdateForSchema(UnfilteredRowIterator partition, ColumnFilter filter) + { + // This method is used during schema migration tasks, and if cdc is disabled, we want to force excluding the + // 'cdc' column from the TABLES schema table because it is problematic if received by older nodes (see #12236 + // and #12697). Otherwise though, we just simply "buffer" the content of the partition into a PartitionUpdate. + if (DatabaseDescriptor.isCDCEnabled() || !partition.metadata().cfName.equals(TABLES)) + return PartitionUpdate.fromIterator(partition, filter); + + // We want to skip the 'cdc' column. A simple solution for that is based on the fact that + // 'PartitionUpdate.fromIterator()' will ignore any columns that are marked as 'fetched' but not 'queried'. + ColumnFilter.Builder builder = ColumnFilter.allColumnsBuilder(partition.metadata()); + for (ColumnDefinition column : filter.fetchedColumns()) + { + if (!column.name.toString().equals("cdc")) + builder.add(column); + } + + return PartitionUpdate.fromIterator(partition, builder.build()); + } + private static boolean isSystemKeyspaceSchemaPartition(DecoratedKey partitionKey) { return SchemaConstants.isSystemKeyspace(UTF8Type.instance.compose(partitionKey.getKey()));