Drop/add column name with different Kind can result in corruption

patch by Benedict; reviewed by Sam for CASSANDRA-14843


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/4b1f40d5
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/4b1f40d5
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/4b1f40d5

Branch: refs/heads/trunk
Commit: 4b1f40d5382638bf3913293b713d5d22b57c844d
Parents: f7630e4
Author: Benedict Elliott Smith <bened...@apache.org>
Authored: Tue Nov 27 16:22:05 2018 +0000
Committer: Benedict Elliott Smith <bened...@apache.org>
Committed: Thu Nov 29 14:29:20 2018 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../cassandra/db/SerializationHeader.java       |  44 ++++---
 .../cassandra/db/SerializationHeaderTest.java   | 129 +++++++++++++++++++
 3 files changed, 155 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 6e18de1..060fa9d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,6 +1,7 @@
 3.0.18
  * RangeTombstoneList doesn't properly clean up mergeable or superseded rts in 
some cases (CASSANDRA-14894)
  * Fix handling of collection tombstones for dropped columns from legacy 
sstables (CASSANDRA-14912)
+ * Drop/add column name with different Kind can result in corruption 
(CASSANDRA-14843)
  * Fix missing rows when reading 2.1 SSTables with static columns in 3.0 
(CASSANDRA-14873)
  * Move TWCS message 'No compaction necessary for bucket size' to Trace level 
(CASSANDRA-14884)
  * Sstable min/max metadata can cause data loss (CASSANDRA-14861)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/src/java/org/apache/cassandra/db/SerializationHeader.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SerializationHeader.java 
b/src/java/org/apache/cassandra/db/SerializationHeader.java
index b2ed26e..5c4f518 100644
--- a/src/java/org/apache/cassandra/db/SerializationHeader.java
+++ b/src/java/org/apache/cassandra/db/SerializationHeader.java
@@ -325,31 +325,37 @@ public class SerializationHeader
         public SerializationHeader toHeader(CFMetaData metadata)
         {
             Map<ByteBuffer, AbstractType<?>> typeMap = new 
HashMap<>(staticColumns.size() + regularColumns.size());
-            typeMap.putAll(staticColumns);
-            typeMap.putAll(regularColumns);
 
             PartitionColumns.Builder builder = PartitionColumns.builder();
-            for (ByteBuffer name : typeMap.keySet())
+            for (Map<ByteBuffer, AbstractType<?>> map : 
ImmutableList.of(staticColumns, regularColumns))
             {
-                ColumnDefinition column = metadata.getColumnDefinition(name);
-
-                if (column == null)
+                boolean isStatic = map == staticColumns;
+                for (Map.Entry<ByteBuffer, AbstractType<?>> e : map.entrySet())
                 {
-                    // TODO: this imply we don't read data for a column we 
don't yet know about, which imply this is theoretically
-                    // racy with column addition. Currently, it is up to the 
user to not write data before the schema has propagated
-                    // and this is far from being the only place that has such 
problem in practice. This doesn't mean we shouldn't
-                    // improve this.
-
-                    // If we don't find the definition, it could be we have 
data for a dropped column, and we shouldn't
-                    // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
-                    // deserialization. The column will be ignore later on 
anyway.
-                    boolean isStatic = staticColumns.containsKey(name);
-                    column = metadata.getDroppedColumnDefinition(name, 
isStatic);
-                    if (column == null)
-                        throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+                    ByteBuffer name = e.getKey();
+                    AbstractType<?> other = typeMap.put(name, e.getValue());
+                    if (other != null && !other.equals(e.getValue()))
+                        throw new IllegalStateException("Column " + name + " 
occurs as both regular and static with types " + other + "and " + e.getValue());
+
+                    ColumnDefinition column = 
metadata.getColumnDefinition(name);
+                    if (column == null || column.isStatic() != isStatic)
+                    {
+                        // TODO: this imply we don't read data for a column we 
don't yet know about, which imply this is theoretically
+                        // racy with column addition. Currently, it is up to 
the user to not write data before the schema has propagated
+                        // and this is far from being the only place that has 
such problem in practice. This doesn't mean we shouldn't
+                        // improve this.
+
+                        // If we don't find the definition, it could be we 
have data for a dropped column, and we shouldn't
+                        // fail deserialization because of that. So we grab a 
"fake" ColumnDefinition that ensure proper
+                        // deserialization. The column will be ignore later on 
anyway.
+                        column = metadata.getDroppedColumnDefinition(name, 
isStatic);
+                        if (column == null)
+                            throw new RuntimeException("Unknown column " + 
UTF8Type.instance.getString(name) + " during deserialization");
+                    }
+                    builder.add(column);
                 }
-                builder.add(column);
             }
+
             return new SerializationHeader(true, keyType, clusteringTypes, 
builder.build(), stats, typeMap);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/4b1f40d5/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java 
b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
new file mode 100644
index 0000000..3e9f3bc
--- /dev/null
+++ b/test/unit/org/apache/cassandra/db/SerializationHeaderTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.db;
+
+import com.google.common.io.Files;
+import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.config.ColumnDefinition;
+import org.apache.cassandra.cql3.ColumnIdentifier;
+import org.apache.cassandra.db.compaction.OperationType;
+import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
+import org.apache.cassandra.db.marshal.Int32Type;
+import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.rows.BTreeRow;
+import org.apache.cassandra.db.rows.BufferCell;
+import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.db.rows.Row;
+import org.apache.cassandra.db.rows.UnfilteredRowIterator;
+import org.apache.cassandra.io.sstable.Component;
+import org.apache.cassandra.io.sstable.Descriptor;
+import org.apache.cassandra.io.sstable.ISSTableScanner;
+import org.apache.cassandra.io.sstable.format.SSTableFormat;
+import org.apache.cassandra.io.sstable.format.SSTableReader;
+import org.apache.cassandra.io.sstable.format.SSTableWriter;
+import org.apache.cassandra.io.sstable.format.big.BigFormat;
+import org.apache.cassandra.io.sstable.format.big.BigTableWriter;
+import org.apache.cassandra.io.util.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+
+public class SerializationHeaderTest
+{
+    private static String KEYSPACE = "SerializationHeaderTest";
+
+    @Test
+    public void testWrittenAsDifferentKind() throws Exception
+    {
+        final String tableName = "testWrittenAsDifferentKind";
+        final String schemaCqlWithStatic = String.format("CREATE TABLE %s (k 
int, c int, v int static, PRIMARY KEY(k, c))", tableName);
+        final String schemaCqlWithRegular = String.format("CREATE TABLE %s (k 
int, c int, v int, PRIMARY KEY(k, c))", tableName);
+        ColumnIdentifier v = ColumnIdentifier.getInterned("v", false);
+        CFMetaData schemaWithStatic = CFMetaData.compile(schemaCqlWithStatic, 
KEYSPACE);
+        CFMetaData schemaWithRegular = 
CFMetaData.compile(schemaCqlWithRegular, KEYSPACE);
+        ColumnDefinition columnStatic = 
schemaWithStatic.getColumnDefinition(v);
+        ColumnDefinition columnRegular = 
schemaWithRegular.getColumnDefinition(v);
+        schemaWithStatic.recordColumnDrop(columnRegular, 0L);
+        schemaWithRegular.recordColumnDrop(columnStatic, 0L);
+
+        final AtomicInteger generation = new AtomicInteger();
+        File dir = Files.createTempDir();
+        try
+        {
+            BiFunction<CFMetaData, Function<ByteBuffer, Clustering>, 
Callable<Descriptor>> writer = (schema, clusteringFunction) -> () -> {
+                Descriptor descriptor = new 
Descriptor(BigFormat.latestVersion, dir, schema.ksName, schema.cfName, 
generation.incrementAndGet(), SSTableFormat.Type.BIG, Component.DIGEST_CRC32);
+
+                SerializationHeader header = 
SerializationHeader.makeWithoutStats(schema);
+                try (LifecycleTransaction txn = 
LifecycleTransaction.offline(OperationType.WRITE);
+                     SSTableWriter sstableWriter = 
BigTableWriter.create(schema, descriptor, 1, 0L, 0, header, txn))
+                {
+                    ColumnDefinition cd = schema.getColumnDefinition(v);
+                    for (int i = 0 ; i < 5 ; ++i) {
+                        final ByteBuffer value = 
Int32Type.instance.decompose(i);
+                        Cell cell = BufferCell.live(schema, cd, 1L, value);
+                        Clustering clustering = 
clusteringFunction.apply(value);
+                        Row row = BTreeRow.singleCellRow(clustering, cell);
+                        
sstableWriter.append(PartitionUpdate.singleRowUpdate(schema, value, 
row).unfilteredIterator());
+                    }
+                    sstableWriter.finish(false);
+                    txn.finish();
+                }
+                return descriptor;
+            };
+
+            Descriptor sstableWithRegular = writer.apply(schemaWithRegular, 
Clustering::new).call();
+            Descriptor sstableWithStatic = writer.apply(schemaWithStatic, 
value -> Clustering.STATIC_CLUSTERING).call();
+            SSTableReader readerWithStatic = 
SSTableReader.openNoValidation(sstableWithStatic, schemaWithRegular);
+            SSTableReader readerWithRegular = 
SSTableReader.openNoValidation(sstableWithRegular, schemaWithStatic);
+
+            try (ISSTableScanner partitions = readerWithStatic.getScanner()) {
+                for (int i = 0 ; i < 5 ; ++i)
+                {
+                    UnfilteredRowIterator partition = partitions.next();
+                    Assert.assertFalse(partition.hasNext());
+                    long value = 
Int32Type.instance.compose(partition.staticRow().getCell(columnStatic).value());
+                    Assert.assertEquals(value, (long)i);
+                }
+                Assert.assertFalse(partitions.hasNext());
+            }
+            try (ISSTableScanner partitions = readerWithRegular.getScanner()) {
+                for (int i = 0 ; i < 5 ; ++i)
+                {
+                    UnfilteredRowIterator partition = partitions.next();
+                    long value = 
Int32Type.instance.compose(((Row)partition.next()).getCell(columnRegular).value());
+                    Assert.assertEquals(value, (long)i);
+                    Assert.assertTrue(partition.staticRow().isEmpty());
+                    Assert.assertFalse(partition.hasNext());
+                }
+                Assert.assertFalse(partitions.hasNext());
+            }
+        }
+        finally
+        {
+            FileUtils.deleteRecursive(dir);
+        }
+    }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to