Validate size of indexed column values patch by Sam Tunnicliffe; reviewed by Aleksey Yeschenko for CASSANDRA-8280
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0e3d9fc1 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0e3d9fc1 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0e3d9fc1 Branch: refs/heads/trunk Commit: 0e3d9fc14bfcb38b9f179c0428cf586890c4a8ab Parents: 2ce1ad8 Author: Sam Tunnicliffe <s...@beobal.com> Authored: Mon Nov 24 14:50:14 2014 +0300 Committer: Aleksey Yeschenko <alek...@apache.org> Committed: Mon Nov 24 14:50:14 2014 +0300 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/cql3/CFDefinition.java | 5 + .../cassandra/cql3/ColumnNameBuilder.java | 7 ++ .../cql3/statements/ModificationStatement.java | 20 +-- .../cql3/statements/UpdateStatement.java | 22 +++- .../db/index/SecondaryIndexManager.java | 6 +- .../cassandra/db/marshal/CompositeType.java | 9 ++ .../cassandra/io/sstable/SSTableWriter.java | 9 ++ .../cql3/IndexedValuesValidationTest.java | 124 +++++++++++++++++++ 9 files changed, 192 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 6a5ac0d..412eb59 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 2.0.12: + * Validate size of indexed column values (CASSANDRA-8280) * Make LCS split compaction results over all data directories (CASSANDRA-8329) * Fix some failing queries that use multi-column relations on COMPACT STORAGE tables (CASSANDRA-8264) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/CFDefinition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/CFDefinition.java b/src/java/org/apache/cassandra/cql3/CFDefinition.java index 23bedaf..e0bb409 100644 --- a/src/java/org/apache/cassandra/cql3/CFDefinition.java +++ b/src/java/org/apache/cassandra/cql3/CFDefinition.java @@ -358,5 +358,10 @@ public class CFDefinition implements Iterable<CFDefinition.Name> return columnName; } + + public int getLength() + { + return columnName == null ? 0 : columnName.remaining(); + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java index 3d5eff6..50cdc74 100644 --- a/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java +++ b/src/java/org/apache/cassandra/cql3/ColumnNameBuilder.java @@ -78,4 +78,11 @@ public interface ColumnNameBuilder */ public ByteBuffer getComponent(int i); + /** + * Returns the total length of the ByteBuffer that will + * be returned by build(). + * @return the total length of the column name to be built + */ + public int getLength(); + } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 61f65c1..db22e7d 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -22,8 +22,6 @@ import java.util.*; import com.google.common.base.Function; import com.google.common.collect.Iterables; -import org.apache.cassandra.db.marshal.AbstractType; -import org.github.jamm.MemoryMeter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +41,9 @@ import org.apache.cassandra.service.QueryState; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.thrift.ThriftValidation; import org.apache.cassandra.transport.messages.ResultMessage; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; +import org.github.jamm.MemoryMeter; /* * Abstract parent class of individual modifications, i.e. INSERT, UPDATE and DELETE. @@ -328,7 +328,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF throws InvalidRequestException { CFDefinition cfDef = cfm.getCfDef(); - ColumnNameBuilder keyBuilder = cfDef.getKeyNameBuilder(); + ColumnNameBuilder keyBuilderBase = cfDef.getKeyNameBuilder(); List<ByteBuffer> keys = new ArrayList<ByteBuffer>(); for (CFDefinition.Name name : cfDef.partitionKeys()) { @@ -337,14 +337,19 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF throw new InvalidRequestException(String.format("Missing mandatory PRIMARY KEY part %s", name)); List<ByteBuffer> values = r.values(variables); - - if (keyBuilder.remainingCount() == 1) + if (keyBuilderBase.remainingCount() == 1) { for (ByteBuffer val : values) { if (val == null) throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name)); - ByteBuffer key = keyBuilder.copy().add(val).build(); + + ColumnNameBuilder keyBuilder = keyBuilderBase.copy().add(val); + if (keyBuilder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format("Partition key size %s exceeds maximum %s", + keyBuilder.getLength(), + FBUtilities.MAX_UNSIGNED_SHORT)); + ByteBuffer key = keyBuilder.build(); ThriftValidation.validateKey(cfm, key); keys.add(key); } @@ -356,7 +361,7 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF ByteBuffer val = values.get(0); if (val == null) throw new InvalidRequestException(String.format("Invalid null value for partition key part %s", name)); - keyBuilder.add(val); + keyBuilderBase.add(val); } } return keys; @@ -727,7 +732,6 @@ public abstract class ModificationStatement implements CQLStatement, MeasurableF Collection<IMutation> mutations = new ArrayList<IMutation>(); for (ByteBuffer key: keys) { - ThriftValidation.validateKey(cfm, key); ColumnFamily cf = UnsortedColumns.factory.create(cfm); addUpdateForKey(cf, key, clusteringPrefix, params); RowMutation rm = new RowMutation(cfm.ksName, key, cf); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java index e2da251..9d98c84 100644 --- a/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/UpdateStatement.java @@ -23,9 +23,10 @@ import java.util.*; import org.apache.cassandra.cql3.*; import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.db.*; -import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.exceptions.*; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Pair; /** @@ -51,6 +52,11 @@ public class UpdateStatement extends ModificationStatement { CFDefinition cfDef = cfm.getCfDef(); + if (builder.getLength() > FBUtilities.MAX_UNSIGNED_SHORT) + throw new InvalidRequestException(String.format("The sum of all clustering columns is too long (%s > %s)", + builder.getLength(), + FBUtilities.MAX_UNSIGNED_SHORT)); + // Inserting the CQL row marker (see #4361) // We always need to insert a marker for INSERT, because of the following situation: // CREATE TABLE t ( k int PRIMARY KEY, c text ); @@ -99,6 +105,20 @@ public class UpdateStatement extends ModificationStatement for (Operation update : updates) update.execute(key, cf, builder.copy(), params); } + + SecondaryIndexManager indexManager = Keyspace.open(cfm.ksName).getColumnFamilyStore(cfm.cfId).indexManager; + if (indexManager.hasIndexes()) + { + for (Column column : cf) + { + if (!indexManager.validate(column)) + throw new InvalidRequestException(String.format("Can't index column value of size %d for index %s on %s.%s", + column.value().remaining(), + cfm.getColumnDefinitionFromColumnName(column.name()).getIndexName(), + cfm.ksName, + cfm.cfName)); + } + } } public static class ParsedInsert extends ModificationStatement.Parsed http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java index 6d9f28a..fda79f8 100644 --- a/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java +++ b/src/java/org/apache/cassandra/db/index/SecondaryIndexManager.java @@ -574,8 +574,10 @@ public class SecondaryIndexManager public boolean validate(Column column) { - SecondaryIndex index = getIndexForColumn(column.name()); - return index == null || index.validate(column); + for (SecondaryIndex index : indexFor(column.name())) + if (!index.validate(column)) + return false; + return true; } public static interface Updater http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/db/marshal/CompositeType.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/marshal/CompositeType.java b/src/java/org/apache/cassandra/db/marshal/CompositeType.java index 946ba24..f0d9d9b 100644 --- a/src/java/org/apache/cassandra/db/marshal/CompositeType.java +++ b/src/java/org/apache/cassandra/db/marshal/CompositeType.java @@ -504,5 +504,14 @@ public class CompositeType extends AbstractCompositeType return components.get(i); } + + public int getLength() + { + int length = 0; + for (ByteBuffer component : components) + length += component.remaining() + 3; // length + 2 bytes for length + EOC + + return length; + } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java index 4619ddc..afa066d 100644 --- a/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/SSTableWriter.java @@ -36,6 +36,7 @@ import org.apache.cassandra.io.compress.CompressedSequentialWriter; import org.apache.cassandra.io.util.*; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FilterFactory; import org.apache.cassandra.utils.IFilter; import org.apache.cassandra.utils.Pair; @@ -181,6 +182,14 @@ public class SSTableWriter extends SSTable public void append(DecoratedKey decoratedKey, ColumnFamily cf) { + if (decoratedKey.key.remaining() > FBUtilities.MAX_UNSIGNED_SHORT) + { + logger.error("Key size {} exceeds maximum of {}, skipping row", + decoratedKey.key.remaining(), + FBUtilities.MAX_UNSIGNED_SHORT); + return; + } + long startPosition = beforeAppend(decoratedKey); try { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0e3d9fc1/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java new file mode 100644 index 0000000..9c2bc0f --- /dev/null +++ b/test/unit/org/apache/cassandra/cql3/IndexedValuesValidationTest.java @@ -0,0 +1,124 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ +package org.apache.cassandra.cql3; + +import java.nio.ByteBuffer; +import java.util.Collections; +import org.junit.BeforeClass; +import org.junit.Test; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.exceptions.InvalidRequestException; +import org.apache.cassandra.exceptions.RequestExecutionException; +import org.apache.cassandra.exceptions.RequestValidationException; +import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.QueryState; +import org.apache.cassandra.utils.MD5Digest; + +import static org.junit.Assert.fail; +import static org.apache.cassandra.cql3.QueryProcessor.process; + +public class IndexedValuesValidationTest +{ + static ClientState clientState; + static String keyspace = "indexed_value_validation_test"; + + @BeforeClass + public static void setUpClass() throws Throwable + { + SchemaLoader.loadSchema(); + executeSchemaChange("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'}"); + clientState = ClientState.forInternalCalls(); + } + + // CASSANDRA-8280/8081 + // reject updates with indexed values where value > 64k + @Test + public void testIndexOnCompositeValueOver64k() throws Throwable + { + executeSchemaChange("CREATE TABLE %s.composite_index_table (a int, b int, c blob, PRIMARY KEY (a))"); + executeSchemaChange("CREATE INDEX ON %s.composite_index_table(c)"); + performInsertWithIndexedValueOver64k("INSERT INTO %s.composite_index_table (a, b, c) VALUES (0, 0, ?)"); + } + + @Test + public void testIndexOnClusteringValueOver64k() throws Throwable + { + executeSchemaChange("CREATE TABLE %s.ck_index_table (a int, b blob, c int, PRIMARY KEY (a, b))"); + executeSchemaChange("CREATE INDEX ON %s.ck_index_table(b)"); + performInsertWithIndexedValueOver64k("INSERT INTO %s.ck_index_table (a, b, c) VALUES (0, ?, 0)"); + } + + @Test + public void testIndexOnPartitionKeyOver64k() throws Throwable + { + executeSchemaChange("CREATE TABLE %s.pk_index_table (a blob, b int, c int, PRIMARY KEY ((a, b)))"); + executeSchemaChange("CREATE INDEX ON %s.pk_index_table(a)"); + performInsertWithIndexedValueOver64k("INSERT INTO %s.pk_index_table (a, b, c) VALUES (?, 0, 0)"); + } + + @Test + public void testCompactTableWithValueOver64k() throws Throwable + { + executeSchemaChange("CREATE TABLE %s.compact_table (a int, b blob, PRIMARY KEY (a)) WITH COMPACT STORAGE"); + executeSchemaChange("CREATE INDEX ON %s.compact_table(b)"); + performInsertWithIndexedValueOver64k("INSERT INTO %s.compact_table (a, b) VALUES (0, ?)"); + } + + private static void performInsertWithIndexedValueOver64k(String insertCQL) throws Exception + { + ByteBuffer buf = ByteBuffer.allocate(1024 * 65); + buf.clear(); + for (int i=0; i<1024 + 1; i++) + buf.put((byte)0); + + try + { + execute(String.format(insertCQL, keyspace), buf); + fail("Expected statement to fail validation"); + } + catch (InvalidRequestException e) + { + // as expected + } + } + + private static void execute(String query, ByteBuffer value) throws RequestValidationException, RequestExecutionException + { + MD5Digest statementId = QueryProcessor.prepare(String.format(query, keyspace), clientState, false).statementId; + CQLStatement statement = QueryProcessor.instance.getPrepared(statementId); + statement.executeInternal(QueryState.forInternalCalls(), + new QueryOptions(ConsistencyLevel.ONE, Collections.singletonList(value))); + } + + private static void executeSchemaChange(String query) throws Throwable + { + try + { + process(String.format(query, keyspace), ConsistencyLevel.ONE); + } + catch (RuntimeException exc) + { + throw exc.getCause(); + } + } +} +