Fix support for SuperColumn tables Patch by Alex Petrov; reviewed by Sylvain Lebresne for CASSANDRA-12373
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/ce8c9b55 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/ce8c9b55 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/ce8c9b55 Branch: refs/heads/trunk Commit: ce8c9b559f48e72cb4488e75211be338d28bdb13 Parents: 69e6e0e Author: Alex Petrov <[email protected]> Authored: Thu Oct 13 11:46:45 2016 +0200 Committer: Alex Petrov <[email protected]> Committed: Mon Sep 25 09:16:30 2017 +0200 ---------------------------------------------------------------------- CHANGES.txt | 1 + .../org/apache/cassandra/config/CFMetaData.java | 201 +++- .../apache/cassandra/cql3/AbstractMarker.java | 10 +- .../apache/cassandra/cql3/ColumnCondition.java | 6 + .../apache/cassandra/cql3/ColumnConditions.java | 6 + .../org/apache/cassandra/cql3/Constants.java | 3 +- src/java/org/apache/cassandra/cql3/Maps.java | 85 ++ .../cassandra/cql3/MultiColumnRelation.java | 64 +- .../org/apache/cassandra/cql3/Operation.java | 81 ++ .../org/apache/cassandra/cql3/Relation.java | 10 + .../cassandra/cql3/SingleColumnRelation.java | 76 +- .../cql3/SuperColumnCompatibility.java | 765 +++++++++++++++ .../apache/cassandra/cql3/UpdateParameters.java | 9 +- .../org/apache/cassandra/cql3/WhereClause.java | 13 +- .../restrictions/PrimaryKeyRestrictionSet.java | 2 - .../restrictions/SingleColumnRestriction.java | 206 +++- .../restrictions/StatementRestrictions.java | 26 +- .../cassandra/cql3/restrictions/TermSlice.java | 2 +- .../cql3/statements/CreateViewStatement.java | 2 + .../cql3/statements/DeleteStatement.java | 30 +- .../cql3/statements/ModificationStatement.java | 15 +- .../cql3/statements/SelectStatement.java | 59 +- .../cql3/statements/UpdateStatement.java | 107 ++- src/java/org/apache/cassandra/db/Columns.java | 1 + .../org/apache/cassandra/db/CompactTables.java | 44 +- .../org/apache/cassandra/db/LegacyLayout.java | 3 +- .../cassandra/db/SerializationHeader.java | 1 + .../cassandra/schema/LegacySchemaMigrator.java | 39 +- .../cassandra/thrift/CassandraServer.java | 7 +- .../cassandra/thrift/ThriftConversion.java | 15 +- .../unit/org/apache/cassandra/SchemaLoader.java | 19 +- .../org/apache/cassandra/cql3/ViewTest.java | 31 +- .../cql3/validation/ThriftIntegrationTest.java | 942 +++++++++++++++++++ .../validation/operations/ThriftCQLTester.java | 90 ++ .../db/ColumnFamilyStoreCQLHelperTest.java | 12 +- .../schema/LegacySchemaMigratorTest.java | 13 +- 36 files changed, 2778 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 9cba02b..7745e8c 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 3.0.15 + * Fix support for SuperColumn tables (CASSANDRA-12373) * Handle limit correctly on tables with strict liveness (CASSANDRA-13883) * Fix missing original update in TriggerExecutor (CASSANDRA-13894) * Remove non-rpc-ready nodes from counter leader candidates (CASSANDRA-13043) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/config/CFMetaData.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java index 1eb991a..fd1c9e5 100644 --- a/src/java/org/apache/cassandra/config/CFMetaData.java +++ b/src/java/org/apache/cassandra/config/CFMetaData.java @@ -31,6 +31,8 @@ import com.google.common.base.MoreObjects; import com.google.common.base.Objects; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.collect.Iterators; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; @@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.cql3.ColumnIdentifier; import org.apache.cassandra.cql3.QueryProcessor; +import org.apache.cassandra.cql3.SuperColumnCompatibility; import org.apache.cassandra.cql3.statements.CFStatement; import org.apache.cassandra.cql3.statements.CreateTableStatement; import org.apache.cassandra.db.*; @@ -114,6 +117,33 @@ public final class CFMetaData // for those tables in practice). private volatile ColumnDefinition compactValueColumn; + /** + * These two columns are "virtual" (e.g. not persisted together with schema). + * + * They are stored here to avoid re-creating during SELECT and UPDATE queries, where + * they are used to allow presenting supercolumn families in the CQL-compatible + * format. See {@link SuperColumnCompatibility} for more details. + **/ + private volatile ColumnDefinition superCfKeyColumn; + private volatile ColumnDefinition superCfValueColumn; + + public boolean isSuperColumnKeyColumn(ColumnDefinition cd) + { + return cd.name.equals(superCfKeyColumn.name); + } + + public boolean isSuperColumnValueColumn(ColumnDefinition cd) + { + return cd.name.equals(superCfValueColumn.name); + } + + public ColumnDefinition superColumnValueColumn() + { + return superCfValueColumn; + } + + public ColumnDefinition superColumnKeyColumn() { return superCfKeyColumn; } + /* * All of these methods will go away once CFMetaData becomes completely immutable. */ @@ -242,7 +272,9 @@ public final class CFMetaData List<ColumnDefinition> partitionKeyColumns, List<ColumnDefinition> clusteringColumns, PartitionColumns partitionColumns, - IPartitioner partitioner) + IPartitioner partitioner, + ColumnDefinition superCfKeyColumn, + ColumnDefinition superCfValueColumn) { this.cfId = cfId; this.ksName = keyspace; @@ -253,7 +285,8 @@ public final class CFMetaData ksAndCFBytes = Arrays.copyOf(ksBytes, ksBytes.length + cfBytes.length); System.arraycopy(cfBytes, 0, ksAndCFBytes, ksBytes.length, cfBytes.length); - this.isDense = isDense; + this.isDense = isSuper ? (isDense || SuperColumnCompatibility.recalculateIsDense(partitionColumns.regulars)) : isDense; + this.isCompound = isCompound; this.isSuper = isSuper; this.isCounter = isCounter; @@ -283,32 +316,64 @@ public final class CFMetaData this.clusteringColumns = clusteringColumns; this.partitionColumns = partitionColumns; - this.serializers = new Serializers(this); + this.superCfKeyColumn = superCfKeyColumn; + this.superCfValueColumn = superCfValueColumn; + //This needs to happen before serializers are set + //because they use comparator.subtypes() rebuild(); + + this.serializers = new Serializers(this); } // This rebuild informations that are intrinsically duplicate of the table definition but // are kept because they are often useful in a different format. private void rebuild() { - this.comparator = new ClusteringComparator(extractTypes(clusteringColumns)); + if (isCompactTable()) + { + this.compactValueColumn = isSuper() ? + SuperColumnCompatibility.getCompactValueColumn(partitionColumns) : + CompactTables.getCompactValueColumn(partitionColumns); + } - Map<ByteBuffer, ColumnDefinition> newColumnMetadata = new HashMap<>(); - for (ColumnDefinition def : partitionKeyColumns) - newColumnMetadata.put(def.name.bytes, def); - for (ColumnDefinition def : clusteringColumns) - newColumnMetadata.put(def.name.bytes, def); - for (ColumnDefinition def : partitionColumns) - newColumnMetadata.put(def.name.bytes, def); + Map<ByteBuffer, ColumnDefinition> newColumnMetadata = Maps.newHashMapWithExpectedSize(partitionKeyColumns.size() + clusteringColumns.size() + partitionColumns.size()); + if (isSuper() && isDense()) + { + CompactTables.DefaultNames defaultNames = SuperColumnCompatibility.columnNameGenerator(partitionKeyColumns, clusteringColumns, partitionColumns); + if (superCfKeyColumn == null) + superCfKeyColumn = SuperColumnCompatibility.getSuperCfKeyColumn(this, clusteringColumns, defaultNames); + if (superCfValueColumn == null) + superCfValueColumn = SuperColumnCompatibility.getSuperCfValueColumn(this, partitionColumns, superCfKeyColumn, defaultNames); + + for (ColumnDefinition def : partitionKeyColumns) + newColumnMetadata.put(def.name.bytes, def); + newColumnMetadata.put(clusteringColumns.get(0).name.bytes, clusteringColumns.get(0)); + newColumnMetadata.put(superCfKeyColumn.name.bytes, SuperColumnCompatibility.getSuperCfSschemaRepresentation(superCfKeyColumn)); + newColumnMetadata.put(superCfValueColumn.name.bytes, superCfValueColumn); + newColumnMetadata.put(compactValueColumn.name.bytes, compactValueColumn); + clusteringColumns = Arrays.asList(clusteringColumns().get(0)); + partitionColumns = PartitionColumns.of(compactValueColumn); + } + else + { + for (ColumnDefinition def : partitionKeyColumns) + newColumnMetadata.put(def.name.bytes, def); + for (ColumnDefinition def : clusteringColumns) + newColumnMetadata.put(def.name.bytes, def); + for (ColumnDefinition def : partitionColumns) + newColumnMetadata.put(def.name.bytes, def); + } this.columnMetadata = newColumnMetadata; List<AbstractType<?>> keyTypes = extractTypes(partitionKeyColumns); this.keyValidator = keyTypes.size() == 1 ? keyTypes.get(0) : CompositeType.getInstance(keyTypes); - if (isCompactTable()) - this.compactValueColumn = CompactTables.getCompactValueColumn(partitionColumns, isSuper()); + if (isSuper()) + this.comparator = new ClusteringComparator(clusteringColumns.get(0).type); + else + this.comparator = new ClusteringComparator(extractTypes(clusteringColumns)); } public Indexes getIndexes() @@ -361,7 +426,9 @@ public final class CFMetaData partitions, clusterings, builder.build(), - partitioner); + partitioner, + null, + null); } private static List<AbstractType<?>> extractTypes(List<ColumnDefinition> clusteringColumns) @@ -462,7 +529,9 @@ public final class CFMetaData copy(partitionKeyColumns), copy(clusteringColumns), copy(partitionColumns), - partitioner), + partitioner, + superCfKeyColumn, + superCfValueColumn), this); } @@ -479,7 +548,9 @@ public final class CFMetaData copy(partitionKeyColumns), copy(clusteringColumns), copy(partitionColumns), - partitioner), + partitioner, + superCfKeyColumn, + superCfValueColumn), this); } @@ -578,22 +649,39 @@ public final class CFMetaData return columnMetadata.values(); } + private Iterator<ColumnDefinition> nonPkColumnIterator() + { + final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this) && !isSuper(); + if (noNonPkColumns) + { + return Collections.<ColumnDefinition>emptyIterator(); + } + else if (isStaticCompactTable()) + { + return partitionColumns.statics.selectOrderIterator(); + } + else if (isSuper()) + { + if (isDense) + return Iterators.forArray(superCfKeyColumn, superCfValueColumn); + else + return Iterators.filter(partitionColumns.iterator(), (c) -> !c.type.isCollection()); + } + else + return partitionColumns().selectOrderIterator(); + } + // An iterator over all column definitions but that respect the order of a SELECT *. - // This also "hide" the clustering/regular columns for a non-CQL3 non-dense table for backward compatibility - // sake (those are accessible through thrift but not through CQL currently). + // This also hides the clustering/regular columns for a non-CQL3 non-dense table for backward compatibility + // sake (those are accessible through thrift but not through CQL currently) and exposes the key and value + // columns for supercolumn family. public Iterator<ColumnDefinition> allColumnsInSelectOrder() { - final boolean isStaticCompactTable = isStaticCompactTable(); - final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this); return new AbstractIterator<ColumnDefinition>() { private final Iterator<ColumnDefinition> partitionKeyIter = partitionKeyColumns.iterator(); - private final Iterator<ColumnDefinition> clusteringIter = isStaticCompactTable ? Collections.<ColumnDefinition>emptyIterator() : clusteringColumns.iterator(); - private final Iterator<ColumnDefinition> otherColumns = noNonPkColumns - ? Collections.<ColumnDefinition>emptyIterator() - : (isStaticCompactTable - ? partitionColumns.statics.selectOrderIterator() - : partitionColumns.selectOrderIterator()); + private final Iterator<ColumnDefinition> clusteringIter = isStaticCompactTable() ? Collections.<ColumnDefinition>emptyIterator() : clusteringColumns.iterator(); + private final Iterator<ColumnDefinition> otherColumns = nonPkColumnIterator(); protected ColumnDefinition computeNext() { @@ -751,6 +839,8 @@ public final class CFMetaData boolean changeAffectsStatements = !partitionColumns.equals(cfm.partitionColumns); partitionColumns = cfm.partitionColumns; + superCfKeyColumn = cfm.superCfKeyColumn; + superCfValueColumn = cfm.superCfValueColumn; rebuild(); @@ -784,8 +874,12 @@ public final class CFMetaData if (!cfm.cfId.equals(cfId)) throw new ConfigurationException(String.format("Column family ID mismatch (found %s; expected %s)", cfm.cfId, cfId)); - if (!cfm.flags.equals(flags)) - throw new ConfigurationException("types do not match."); + + // Dense flag can get set, see CASSANDRA-12373 for details. We have to remove flag from both parts because + // there's no guaranteed call order in the call. + + if (!cfm.flags.equals(flags) && (!isSuper() || !Sets.difference(cfm.flags, Sets.immutableEnumSet(Flag.DENSE)).equals(Sets.difference(flags, Sets.immutableEnumSet(Flag.DENSE))))) + throw new ConfigurationException("Types do not match: " + cfm.flags + " != " + flags); } @@ -819,7 +913,7 @@ public final class CFMetaData */ public ColumnDefinition getColumnDefinition(ColumnIdentifier name) { - return columnMetadata.get(name.bytes); + return getColumnDefinition(name.bytes); } // In general it is preferable to work with ColumnIdentifier to make it @@ -859,8 +953,8 @@ public final class CFMetaData if (isCounter()) { for (ColumnDefinition def : partitionColumns()) - if (!(def.type instanceof CounterColumnType) && !CompactTables.isSuperColumnMapColumn(def)) - throw new ConfigurationException("Cannot add a non counter column (" + def.name + ") in a counter column family"); + if (!(def.type instanceof CounterColumnType) && (!isSuper() || isSuperColumnValueColumn(def))) + throw new ConfigurationException("Cannot add a non counter column (" + def + ") in a counter column family"); } else { @@ -874,6 +968,7 @@ public final class CFMetaData // initialize a set of names NOT in the CF under consideration KeyspaceMetadata ksm = Schema.instance.getKSMetaData(ksName); + Set<String> indexNames = ksm == null ? new HashSet<>() : ksm.existingIndexNames(cfName); for (IndexMetadata index : indexes) { @@ -888,8 +983,6 @@ public final class CFMetaData return this; } - - // The comparator to validate the definition name with thrift. public AbstractType<?> thriftColumnNameType() { @@ -963,13 +1056,14 @@ public final class CFMetaData public void renameColumn(ColumnIdentifier from, ColumnIdentifier to) throws InvalidRequestException { ColumnDefinition def = getColumnDefinition(from); + if (def == null) throw new InvalidRequestException(String.format("Cannot rename unknown column %s in keyspace %s", from, cfName)); if (getColumnDefinition(to) != null) throw new InvalidRequestException(String.format("Cannot rename column %s to %s in keyspace %s; another column of that name already exist", from, to, cfName)); - if (def.isPartOfCellName(isCQLTable(), isSuper())) + if (def.isPartOfCellName(isCQLTable(), isSuper()) && !isDense()) { throw new InvalidRequestException(String.format("Cannot rename non PRIMARY KEY part %s", from)); } @@ -987,8 +1081,28 @@ public final class CFMetaData .collect(Collectors.joining(",")))); } - ColumnDefinition newDef = def.withNewName(to); - addOrReplaceColumnDefinition(newDef); + if (isSuper() && isDense()) + { + if (isSuperColumnKeyColumn(def)) + { + columnMetadata.remove(superCfKeyColumn.name.bytes); + superCfKeyColumn = superCfKeyColumn.withNewName(to); + columnMetadata.put(superCfKeyColumn.name.bytes, SuperColumnCompatibility.getSuperCfSschemaRepresentation(superCfKeyColumn)); + } + else if (isSuperColumnValueColumn(def)) + { + columnMetadata.remove(superCfValueColumn.name.bytes); + superCfValueColumn = superCfValueColumn.withNewName(to); + columnMetadata.put(superCfValueColumn.name.bytes, superCfValueColumn); + } + else + addOrReplaceColumnDefinition(def.withNewName(to)); + } + else + { + addOrReplaceColumnDefinition(def.withNewName(to)); + } + // removeColumnDefinition doesn't work for partition key (expectedly) but renaming one is fine so we still // want to update columnMetadata. @@ -1098,9 +1212,12 @@ public final class CFMetaData public AbstractType<?> makeLegacyDefaultValidator() { - return isCounter() - ? CounterColumnType.instance - : (isCompactTable() ? compactValueColumn().type : BytesType.instance); + if (isCounter()) + return CounterColumnType.instance; + else if (isCompactTable()) + return isSuper() ? ((MapType)compactValueColumn().type).valueComparator() : compactValueColumn().type; + else + return BytesType.instance; } public static Set<Flag> flagsFromStrings(Set<String> strings) @@ -1198,7 +1315,7 @@ public final class CFMetaData public static Builder createSuper(String keyspace, String table, boolean isCounter) { - return create(keyspace, table, false, false, true, isCounter); + return create(keyspace, table, true, true, true, isCounter); } public Builder withPartitioner(IPartitioner partitioner) @@ -1314,7 +1431,9 @@ public final class CFMetaData partitions, clusterings, builder.build(), - partitioner.orElseGet(DatabaseDescriptor::getPartitioner)); + partitioner.orElseGet(DatabaseDescriptor::getPartitioner), + null, + null); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/AbstractMarker.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/AbstractMarker.java b/src/java/org/apache/cassandra/cql3/AbstractMarker.java index cd26bd7..14170b1 100644 --- a/src/java/org/apache/cassandra/cql3/AbstractMarker.java +++ b/src/java/org/apache/cassandra/cql3/AbstractMarker.java @@ -17,7 +17,6 @@ */ package org.apache.cassandra.cql3; -import java.util.Collections; import java.util.List; import org.apache.cassandra.cql3.functions.Function; @@ -58,7 +57,7 @@ public abstract class AbstractMarker extends Term.NonTerminal */ public static class Raw extends Term.Raw { - protected final int bindIndex; + private final int bindIndex; public Raw(int bindIndex) { @@ -89,6 +88,11 @@ public abstract class AbstractMarker extends Term.NonTerminal { return "?"; } + + public int bindIndex() + { + return bindIndex; + } } /** A MultiColumnRaw version of AbstractMarker.Raw */ @@ -140,7 +144,7 @@ public abstract class AbstractMarker extends Term.NonTerminal @Override public AbstractMarker prepare(String keyspace, ColumnSpecification receiver) throws InvalidRequestException { - return new Lists.Marker(bindIndex, makeInReceiver(receiver)); + return new Lists.Marker(bindIndex(), makeInReceiver(receiver)); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/ColumnCondition.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ColumnCondition.java b/src/java/org/apache/cassandra/cql3/ColumnCondition.java index 60e67f3..99e243c 100644 --- a/src/java/org/apache/cassandra/cql3/ColumnCondition.java +++ b/src/java/org/apache/cassandra/cql3/ColumnCondition.java @@ -60,6 +60,12 @@ public class ColumnCondition assert this.inValues == null; } + // Public for SuperColumn tables support only + public Term value() + { + return value; + } + public static ColumnCondition condition(ColumnDefinition column, Term value, Operator op) { return new ColumnCondition(column, null, value, null, op); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/ColumnConditions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/ColumnConditions.java b/src/java/org/apache/cassandra/cql3/ColumnConditions.java index cb09b1a..5ec4cb4 100644 --- a/src/java/org/apache/cassandra/cql3/ColumnConditions.java +++ b/src/java/org/apache/cassandra/cql3/ColumnConditions.java @@ -104,6 +104,12 @@ public final class ColumnConditions extends AbstractConditions staticConditions.forEach(p -> p.addFunctionsTo(functions)); } + // Public for SuperColumn tables support only + public Collection<ColumnCondition> columnConditions() + { + return this.columnConditions; + } + /** * Creates a new <code>Builder</code> for <code>ColumnConditions</code>. * @return a new <code>Builder</code> for <code>ColumnConditions</code> http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/Constants.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Constants.java b/src/java/org/apache/cassandra/cql3/Constants.java index a2bacdf..f37d900 100644 --- a/src/java/org/apache/cassandra/cql3/Constants.java +++ b/src/java/org/apache/cassandra/cql3/Constants.java @@ -281,7 +281,8 @@ public abstract class Constants public static class Marker extends AbstractMarker { - protected Marker(int bindIndex, ColumnSpecification receiver) + // Constructor is public only for the SuperColumn tables support + public Marker(int bindIndex, ColumnSpecification receiver) { super(bindIndex, receiver); assert !receiver.type.isCollection(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/Maps.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Maps.java b/src/java/org/apache/cassandra/cql3/Maps.java index 4772369..4b6f0fe 100644 --- a/src/java/org/apache/cassandra/cql3/Maps.java +++ b/src/java/org/apache/cassandra/cql3/Maps.java @@ -325,6 +325,91 @@ public abstract class Maps } } + // Currently only used internally counters support in SuperColumn families. + // Addition on the element level inside the collections are otherwise not supported in the CQL. + public static class AdderByKey extends Operation + { + private final Term k; + + public AdderByKey(ColumnDefinition column, Term t, Term k) + { + super(column, t); + this.k = k; + } + + @Override + public void collectMarkerSpecification(VariableSpecifications boundNames) + { + super.collectMarkerSpecification(boundNames); + k.collectMarkerSpecification(boundNames); + } + + public void execute(DecoratedKey partitionKey, UpdateParameters params) throws InvalidRequestException + { + assert column.type.isMultiCell() : "Attempted to set a value for a single key on a frozen map"; + + ByteBuffer key = k.bindAndGet(params.options); + ByteBuffer value = t.bindAndGet(params.options); + + if (key == null) + throw new InvalidRequestException("Invalid null map key"); + if (key == ByteBufferUtil.UNSET_BYTE_BUFFER) + throw new InvalidRequestException("Invalid unset map key"); + + if (value == null) + throw new InvalidRequestException("Invalid null value for counter increment"); + if (value == ByteBufferUtil.UNSET_BYTE_BUFFER) + return; + + long increment = ByteBufferUtil.toLong(value); + params.addCounter(column, increment, CellPath.create(key)); + } + } + + // Currently only used internally counters support in SuperColumn families. + // Addition on the element level inside the collections are otherwise not supported in the CQL. + public static class SubtracterByKey extends Operation + { + private final Term k; + + public SubtracterByKey(ColumnDefinition column, Term t, Term k) + { + super(column, t); + this.k = k; + } + + @Override + public void collectMarkerSpecification(VariableSpecifications boundNames) + { + super.collectMarkerSpecification(boundNames); + k.collectMarkerSpecification(boundNames); + } + + public void execute(DecoratedKey partitionKey, UpdateParameters params) throws InvalidRequestException + { + assert column.type.isMultiCell() : "Attempted to set a value for a single key on a frozen map"; + + ByteBuffer key = k.bindAndGet(params.options); + ByteBuffer value = t.bindAndGet(params.options); + + if (key == null) + throw new InvalidRequestException("Invalid null map key"); + if (key == ByteBufferUtil.UNSET_BYTE_BUFFER) + throw new InvalidRequestException("Invalid unset map key"); + + if (value == null) + throw new InvalidRequestException("Invalid null value for counter increment"); + if (value == ByteBufferUtil.UNSET_BYTE_BUFFER) + return; + + long increment = ByteBufferUtil.toLong(value); + if (increment == Long.MIN_VALUE) + throw new InvalidRequestException("The negation of " + increment + " overflows supported counter precision (signed 8 bytes integer)"); + + params.addCounter(column, -increment, CellPath.create(key)); + } + } + public static class Putter extends Operation { public Putter(ColumnDefinition column, Term t) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java index 143106d..1bfac3f 100644 --- a/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java +++ b/src/java/org/apache/cassandra/cql3/MultiColumnRelation.java @@ -27,6 +27,7 @@ import org.apache.cassandra.cql3.Term.MultiColumnRaw; import org.apache.cassandra.cql3.Term.Raw; import org.apache.cassandra.cql3.restrictions.MultiColumnRestriction; import org.apache.cassandra.cql3.restrictions.Restriction; +import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction; import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -162,7 +163,7 @@ public class MultiColumnRelation extends Relation boolean inclusive) throws InvalidRequestException { List<ColumnDefinition> receivers = receivers(cfm); - Term term = toTerm(receivers(cfm), getValue(), cfm.ksName, boundNames); + Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames); return new MultiColumnRestriction.SliceRestriction(receivers, bound, inclusive, term); } @@ -238,4 +239,65 @@ public class MultiColumnRelation extends Relation .append(valuesOrMarker) .toString(); } + + @Override + public Relation toSuperColumnAdapter() + { + return new SuperColumnMultiColumnRelation(entities, relationType, valuesOrMarker, inValues, inMarker); + } + + /** + * Required for SuperColumn compatibility, in order to map the SuperColumn key restrictions from the regular + * column to the collection key one. + */ + private class SuperColumnMultiColumnRelation extends MultiColumnRelation + { + private SuperColumnMultiColumnRelation(List<ColumnIdentifier.Raw> entities, Operator relationType, MultiColumnRaw valuesOrMarker, List<? extends MultiColumnRaw> inValues, Tuples.INRaw inMarker) + { + super(entities, relationType, valuesOrMarker, inValues, inMarker); + } + + @Override + protected Restriction newSliceRestriction(CFMetaData cfm, + VariableSpecifications boundNames, + Bound bound, + boolean inclusive) throws InvalidRequestException + { + assert cfm.isSuper() && cfm.isDense(); + List<ColumnDefinition> receivers = receivers(cfm); + Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames); + return new SingleColumnRestriction.SuperColumnMultiSliceRestriction(receivers.get(0), bound, inclusive, term); + } + + @Override + protected Restriction newEQRestriction(CFMetaData cfm, + VariableSpecifications boundNames) throws InvalidRequestException + { + assert cfm.isSuper() && cfm.isDense(); + List<ColumnDefinition> receivers = receivers(cfm); + Term term = toTerm(receivers, getValue(), cfm.ksName, boundNames); + return new SingleColumnRestriction.SuperColumnMultiEQRestriction(receivers.get(0), term); + } + + @Override + protected List<ColumnDefinition> receivers(CFMetaData cfm) throws InvalidRequestException + { + assert cfm.isSuper() && cfm.isDense(); + List<ColumnDefinition> names = new ArrayList<>(getEntities().size()); + + for (ColumnIdentifier.Raw raw : getEntities()) + { + ColumnDefinition def = toColumnDefinition(cfm, raw); + + checkTrue(def.isClusteringColumn() || + cfm.isSuperColumnKeyColumn(def), + "Multi-column relations can only be applied to clustering columns but was applied to: %s", def.name); + + checkFalse(names.contains(def), "Column \"%s\" appeared twice in a relation: %s", def.name, this); + + names.add(def); + } + return names; + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/Operation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Operation.java b/src/java/org/apache/cassandra/cql3/Operation.java index ecd37c4..4b8d5ba 100644 --- a/src/java/org/apache/cassandra/cql3/Operation.java +++ b/src/java/org/apache/cassandra/cql3/Operation.java @@ -192,6 +192,11 @@ public abstract class Operation // it's stupid and 2) the result would seem random to the user. return false; } + + public Term.Raw value() + { + return value; + } } public static class SetElement implements RawUpdate @@ -241,6 +246,72 @@ public abstract class Operation } } + // Currently only used internally counters support in SuperColumn families. + // Addition on the element level inside the collections are otherwise not supported in the CQL. + public static class ElementAddition implements RawUpdate + { + private final Term.Raw selector; + private final Term.Raw value; + + public ElementAddition(Term.Raw selector, Term.Raw value) + { + this.selector = selector; + this.value = value; + } + + public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException + { + assert receiver.type instanceof MapType; + Term k = selector.prepare(keyspace, Maps.keySpecOf(receiver)); + Term v = value.prepare(keyspace, Maps.valueSpecOf(receiver)); + + return new Maps.AdderByKey(receiver, v, k); + } + + protected String toString(ColumnSpecification column) + { + return String.format("%s = %s + %s", column.name, column.name, value); + } + + public boolean isCompatibleWith(RawUpdate other) + { + return !(other instanceof SetValue); + } + } + + // Currently only used internally counters support in SuperColumn families. + // Addition on the element level inside the collections are otherwise not supported in the CQL. + public static class ElementSubtraction implements RawUpdate + { + private final Term.Raw selector; + private final Term.Raw value; + + public ElementSubtraction(Term.Raw selector, Term.Raw value) + { + this.selector = selector; + this.value = value; + } + + public Operation prepare(String keyspace, ColumnDefinition receiver) throws InvalidRequestException + { + assert receiver.type instanceof MapType; + Term k = selector.prepare(keyspace, Maps.keySpecOf(receiver)); + Term v = value.prepare(keyspace, Maps.valueSpecOf(receiver)); + + return new Maps.SubtracterByKey(receiver, v, k); + } + + protected String toString(ColumnSpecification column) + { + return String.format("%s = %s + %s", column.name, column.name, value); + } + + public boolean isCompatibleWith(RawUpdate other) + { + return !(other instanceof SetValue); + } + } + public static class Addition implements RawUpdate { private final Term.Raw value; @@ -284,6 +355,11 @@ public abstract class Operation { return !(other instanceof SetValue); } + + public Term.Raw value() + { + return value; + } } public static class Substraction implements RawUpdate @@ -332,6 +408,11 @@ public abstract class Operation { return !(other instanceof SetValue); } + + public Term.Raw value() + { + return value; + } } public static class Prepend implements RawUpdate http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/Relation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/Relation.java b/src/java/org/apache/cassandra/cql3/Relation.java index 334464f..a88932e 100644 --- a/src/java/org/apache/cassandra/cql3/Relation.java +++ b/src/java/org/apache/cassandra/cql3/Relation.java @@ -26,6 +26,7 @@ import org.apache.cassandra.cql3.restrictions.Restriction; import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnrecognizedEntityException; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; @@ -148,6 +149,15 @@ public abstract class Relation { } /** + * Required for SuperColumn compatibility, creates an adapter Relation that remaps all restrictions required for + * SuperColumn tables. + */ + public Relation toSuperColumnAdapter() + { + throw invalidRequest("Unsupported operation (" + this + ") on super column family"); + } + + /** * Creates a new EQ restriction instance. * * @param cfm the Column Family meta data http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java index 05ba42d..455ae0c 100644 --- a/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java +++ b/src/java/org/apache/cassandra/cql3/SingleColumnRelation.java @@ -40,7 +40,7 @@ import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; * a value (term). For example, <key> > "start" or "colname1" = "somevalue". * */ -public final class SingleColumnRelation extends Relation +public class SingleColumnRelation extends Relation { private final ColumnIdentifier.Raw entity; private final Term.Raw mapKey; @@ -303,4 +303,78 @@ public final class SingleColumnRelation extends Relation { return isEQ() || (isIN() && inValues != null && inValues.size() == 1); } + + @Override + public Relation toSuperColumnAdapter() + { + return new SuperColumnSingleColumnRelation(entity, mapKey, relationType, value); + } + + /** + * Required for SuperColumn compatibility, in order to map the SuperColumn key restrictions from the regular + * column to the collection key one. + */ + private class SuperColumnSingleColumnRelation extends SingleColumnRelation + { + SuperColumnSingleColumnRelation(ColumnIdentifier.Raw entity, Raw mapKey, Operator type, Raw value) + { + super(entity, mapKey, type, value, inValues); + } + + @Override + public Restriction newSliceRestriction(CFMetaData cfm, + VariableSpecifications boundNames, + Bound bound, + boolean inclusive) throws InvalidRequestException + { + ColumnDefinition columnDef = toColumnDefinition(cfm, entity); + if (cfm.isSuperColumnKeyColumn(columnDef)) + { + Term term = toTerm(toReceivers(columnDef, cfm.isDense()), value, cfm.ksName, boundNames); + return new SingleColumnRestriction.SuperColumnKeySliceRestriction(cfm.superColumnKeyColumn(), bound, inclusive, term); + } + else + { + return super.newSliceRestriction(cfm, boundNames, bound, inclusive); + } + } + + @Override + protected Restriction newEQRestriction(CFMetaData cfm, + VariableSpecifications boundNames) throws InvalidRequestException + { + ColumnDefinition columnDef = toColumnDefinition(cfm, entity); + if (cfm.isSuperColumnKeyColumn(columnDef)) + { + Term term = toTerm(toReceivers(columnDef, cfm.isDense()), value, cfm.ksName, boundNames); + return new SingleColumnRestriction.SuperColumnKeyEQRestriction(cfm.superColumnKeyColumn(), term); + } + else + { + return super.newEQRestriction(cfm, boundNames); + } + } + + @Override + protected Restriction newINRestriction(CFMetaData cfm, + VariableSpecifications boundNames) throws InvalidRequestException + { + ColumnDefinition columnDef = toColumnDefinition(cfm, entity); + if (cfm.isSuperColumnKeyColumn(columnDef)) + { + List<? extends ColumnSpecification> receivers = Collections.singletonList(cfm.superColumnKeyColumn()); + List<Term> terms = toTerms(receivers, inValues, cfm.ksName, boundNames); + if (terms == null) + { + Term term = toTerm(receivers, value, cfm.ksName, boundNames); + return new SingleColumnRestriction.SuperColumnKeyINRestrictionWithMarkers(cfm.superColumnKeyColumn(), (Lists.Marker) term); + } + return new SingleColumnRestriction.SuperColumnKeyINRestrictionWithValues(cfm.superColumnKeyColumn(), terms); + } + else + { + return super.newINRestriction(cfm, boundNames); + } + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java b/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java new file mode 100644 index 0000000..d4c14df --- /dev/null +++ b/src/java/org/apache/cassandra/cql3/SuperColumnCompatibility.java @@ -0,0 +1,765 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.cql3; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.restrictions.Restriction; +import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction; +import org.apache.cassandra.cql3.restrictions.TermSlice; +import org.apache.cassandra.cql3.selection.Selection; +import org.apache.cassandra.cql3.statements.Bound; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.Columns; +import org.apache.cassandra.db.CompactTables; +import org.apache.cassandra.db.PartitionColumns; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.marshal.AbstractType; +import org.apache.cassandra.db.marshal.MapType; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.ComplexColumnData; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.db.rows.RowIterator; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.utils.Pair; + +import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkNotNull; +import static org.apache.cassandra.cql3.statements.RequestValidations.checkTrue; +import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; +import static org.apache.cassandra.cql3.statements.SelectStatement.getComponents; + +/** + * Class incapsulating the helper logic to handle SELECT / UPDATE / INSERT special-cases related + * to SuperColumn tables in applicable scenarios. + * + * SuperColumn families have a special layout and are represented as a Map internally. These tables + * have two special columns (called `column2` and `value` by default): + * + * * `column2`, {@link CFMetaData#superCfValueColumn}, a key of the SuperColumn map, exposed as a + * REGULAR column, but stored in schema tables as a CLUSTERING column to make a distinction from + * the SC value column in case of renames. + * * `value`, {@link CFMetaData#compactValueColumn()}, a value of the SuperColumn map, exposed and + * stored as a REGULAR column + * + * These columns have to be translated to this internal representation as key and value, correspondingly. + * + * In CQL terms, the SuperColumn families is encoded with: + * + * CREATE TABLE super ( + * key [key_validation_class], + * super_column_name [comparator], + * [column_metadata_1] [type1], + * ..., + * [column_metadata_n] [type1], + * "" map<[sub_comparator], [default_validation_class]> + * PRIMARY KEY (key, super_column_name) + * ) + * + * In other words, every super column is encoded by a row. That row has one column for each defined + * "column_metadata", but it also has a special map column (whose name is the empty string as this is + * guaranteed to never conflict with a user-defined "column_metadata") which stores the super column + * "dynamic" sub-columns. + * + * On write path, `column2` and `value` columns are translated to the key and value of the + * underlying map. During the read, the inverse conversion is done. Deletes are converted into + * discards by the key in the underlying map. Counters are handled by translating an update to a + * counter update with a cell path. See {@link SuperColumnRestrictions} for the details. + * + * Since non-dense SuperColumn families do not modify the contents of the internal map through in CQL + * and do not expose this via CQL either, reads, writes and deletes are handled normally. + * + * Sidenote: a _dense_ SuperColumn Familiy is the one that has no added REGULAR columns. + */ +public class SuperColumnCompatibility +{ + // We use an empty value for the 1) this can't conflict with a user-defined column and 2) this actually + // validate with any comparator which makes it convenient for columnDefinitionComparator(). + public static final ByteBuffer SUPER_COLUMN_MAP_COLUMN = ByteBufferUtil.EMPTY_BYTE_BUFFER; + public static final String SUPER_COLUMN_MAP_COLUMN_STR = UTF8Type.instance.compose(SUPER_COLUMN_MAP_COLUMN); + + /** + * Dense flag might have been incorrectly set if the node was upgraded from 2.x before CASSANDRA-12373. + * + * For 3.x created tables, the flag is set correctly in ThriftConversion code. + */ + public static boolean recalculateIsDense(Columns columns) + { + return columns.size() == 1 && columns.getComplex(0).name.toString().isEmpty(); + } + + /** + * For _dense_ SuperColumn Families, the supercolumn key column has to be translated to the collection subselection + * query in order to avoid reading an entire collection and then filtering out the results. + */ + public static ColumnFilter getColumnFilter(CFMetaData cfm, QueryOptions queryOptions, SuperColumnRestrictions restrictions) + { + assert cfm.isSuper() && cfm.isDense(); + + ColumnFilter.Builder builder = ColumnFilter.selectionBuilder(); + builder.add(cfm.compactValueColumn()); + + if (restrictions.keySliceRestriction != null) + { + SingleColumnRestriction.SuperColumnKeySliceRestriction restriction = restrictions.keySliceRestriction; + TermSlice slice = restriction.slice; + + ByteBuffer start = slice.hasBound(Bound.START) ? slice.bound(Bound.START).bindAndGet(queryOptions) : null; + ByteBuffer end = slice.hasBound(Bound.END) ? slice.bound(Bound.END).bindAndGet(queryOptions) : null; + + builder.slice(cfm.compactValueColumn(), + start == null ? CellPath.BOTTOM : CellPath.create(start), + end == null ? CellPath.TOP : CellPath.create(end)); + } + else if (restrictions.keyEQRestriction != null) + { + SingleColumnRestriction.SuperColumnKeyEQRestriction restriction = restrictions.keyEQRestriction; + ByteBuffer value = restriction.bindValue(queryOptions); + builder.select(cfm.compactValueColumn(), CellPath.create(value)); + } + else if (restrictions.keyINRestriction != null) + { + SingleColumnRestriction.SuperColumnKeyINRestriction cast = restrictions.keyINRestriction; + Set<ByteBuffer> keyINRestrictionValues = new TreeSet<ByteBuffer>(((MapType) cfm.compactValueColumn().type).getKeysType()); + keyINRestrictionValues.addAll(cast.getValues(queryOptions)); + + for (ByteBuffer value : keyINRestrictionValues) + builder.select(cfm.compactValueColumn(), CellPath.create(value)); + } + else if (restrictions.multiEQRestriction != null) + { + SingleColumnRestriction.SuperColumnMultiEQRestriction restriction = restrictions.multiEQRestriction; + ByteBuffer value = restriction.secondValue; + builder.select(cfm.compactValueColumn(), CellPath.create(value)); + } + + return builder.build(); + } + + /** + * For _dense_ SuperColumn Families. + * + * On read path, instead of writing row per map, we have to write a row per key/value pair in map. + * + * For example: + * + * | partition-key | clustering-key | { key1: value1, key2: value2 } | + * + * Will be translated to: + * + * | partition-key | clustering-key | key1 | value1 | + * | partition-key | clustering-key | key2 | value2 | + * + */ + public static void processPartition(CFMetaData cfm, Selection selection, RowIterator partition, Selection.ResultSetBuilder result, int protocolVersion, + SuperColumnRestrictions restrictions, QueryOptions queryOptions) + { + assert cfm.isDense(); + ByteBuffer[] keyComponents = getComponents(cfm, partition.partitionKey()); + + int nowInSeconds = FBUtilities.nowInSeconds(); + while (partition.hasNext()) + { + Row row = partition.next(); + + ComplexColumnData ccd = row.getComplexColumnData(cfm.compactValueColumn()); + + if (ccd == null) + continue; + + Iterator<Cell> cellIter = ccd.iterator(); + + outer: + while (cellIter.hasNext()) + { + Cell cell = cellIter.next(); + ByteBuffer superColumnKey = cell.path().get(0); + + if (restrictions != null) + { + // Slice on SuperColumn key + if (restrictions.keySliceRestriction != null) + { + for (Bound bound : Bound.values()) + { + if (restrictions.keySliceRestriction.hasBound(bound) && + !restrictions.keySliceRestriction.isInclusive(bound)) + { + ByteBuffer excludedValue = restrictions.keySliceRestriction.bindValue(queryOptions); + if (excludedValue.equals(superColumnKey)) + continue outer; + } + } + } + + // Multi-column restriction on clustering+SuperColumn key + if (restrictions.multiSliceRestriction != null && + cfm.comparator.compare(row.clustering(), new Clustering(restrictions.multiSliceRestriction.firstValue)) == 0) + { + AbstractType t = ((MapType) cfm.compactValueColumn().type).getKeysType(); + int cmp = t.compare(superColumnKey, restrictions.multiSliceRestriction.secondValue); + + if ((cmp == 0 && !restrictions.multiSliceRestriction.trueInclusive) || // EQ + (restrictions.multiSliceRestriction.hasBound(Bound.END) && cmp > 0) || // LT + (restrictions.multiSliceRestriction.hasBound(Bound.START) && cmp < 0)) // GT + continue outer; + } + } + + result.newRow(protocolVersion); + + for (ColumnDefinition def : selection.getColumns()) + { + if (cfm.isSuperColumnKeyColumn(def)) + { + result.add(superColumnKey); + } + else if (cfm.isSuperColumnValueColumn(def)) + { + result.add(cell, nowInSeconds); + } + else + { + switch (def.kind) + { + case PARTITION_KEY: + result.add(keyComponents[def.position()]); + break; + case CLUSTERING: + result.add(row.clustering().get(def.position())); + break; + case REGULAR: + case STATIC: + throw new AssertionError(String.format("Invalid column '%s' found in SuperColumn table", def.name.toString())); + } + } + } + } + } + } + + /** + * For _dense_ SuperColumn Families. + * + * On the write path, we have to do combine the columns into a key/value pair: + * + * So inserting a row: + * + * | partition-key | clustering-key | key1 | value1 | + * + * Would result into: + * + * | partition-key | clustering-key | {key1: value1} | + * + * or adding / overwriting the value for `key1`. + */ + public static void prepareInsertOperations(CFMetaData cfm, + List<ColumnIdentifier.Raw> columnNames, + WhereClause.Builder whereClause, + List<Term.Raw> columnValues, + VariableSpecifications boundNames, + Operations operations) + { + List<ColumnDefinition> defs = new ArrayList<>(columnNames.size()); + for (int i = 0; i < columnNames.size(); i++) + { + ColumnIdentifier id = columnNames.get(i).prepare(cfm); + defs.add(cfm.getColumnDefinition(id)); + } + + prepareInsertOperations(cfm, defs, boundNames, columnValues, whereClause, operations); + } + + /** + * For _dense_ SuperColumn Families. + * + * {@link #prepareInsertOperations(CFMetaData, List, VariableSpecifications, List, WhereClause.Builder, Operations)}, + * but for INSERT JSON queries + */ + public static void prepareInsertJSONOperations(CFMetaData cfm, + List<ColumnDefinition> defs, + VariableSpecifications boundNames, + Json.Prepared prepared, + WhereClause.Builder whereClause, + Operations operations) + { + List<Term.Raw> columnValues = new ArrayList<>(defs.size()); + for (ColumnDefinition def : defs) + columnValues.add(prepared.getRawTermForColumn(def)); + + prepareInsertOperations(cfm, defs, boundNames, columnValues, whereClause, operations); + } + + private static void prepareInsertOperations(CFMetaData cfm, + List<ColumnDefinition> defs, + VariableSpecifications boundNames, + List<Term.Raw> columnValues, + WhereClause.Builder whereClause, + Operations operations) + { + assert cfm.isDense(); + assert defs.size() == columnValues.size(); + + Term.Raw superColumnKey = null; + Term.Raw superColumnValue = null; + + for (int i = 0, size = defs.size(); i < size; i++) + { + ColumnDefinition def = defs.get(i); + Term.Raw raw = columnValues.get(i); + + if (cfm.isSuperColumnKeyColumn(def)) + { + superColumnKey = raw; + collectMarkerSpecifications(raw, boundNames, def); + } + else if (cfm.isSuperColumnValueColumn(def)) + { + superColumnValue = raw; + collectMarkerSpecifications(raw, boundNames, def); + } + else if (def.isPrimaryKeyColumn()) + { + whereClause.add(new SingleColumnRelation(new ColumnIdentifier.ColumnIdentifierValue(def.name), Operator.EQ, raw)); + } + else + { + throw invalidRequest("Invalid column {} in where clause"); + } + } + + checkTrue(superColumnValue != null, + "Column value is mandatory for SuperColumn tables"); + checkTrue(superColumnKey != null, + "Column key is mandatory for SuperColumn tables"); + + Operation operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn()); + operations.add(operation); + } + + /** + * Collect the marker specifications for the bound columns manually, since the operations on a column are + * converted to the operations on the collection element. + */ + private static void collectMarkerSpecifications(Term.Raw raw, VariableSpecifications boundNames, ColumnDefinition def) + { + if (raw instanceof AbstractMarker.Raw) + boundNames.add(((AbstractMarker.Raw) raw).bindIndex(), def); + } + + /** + * For _dense_ SuperColumn Families. + * + * During UPDATE operation, the update by clustering (with correponding relation in WHERE clause) + * has to be substituted with an update to the map that backs the given SuperColumn. + * + * For example, an update such as: + * + * UPDATE ... SET value = 'value1' WHERE key = 'pk' AND column1 = 'ck' AND column2 = 'mk' + * + * Will update the value under key 'mk' in the map, backing the SuperColumn, located in the row + * with clustering 'ck' in the partition with key 'pk'. + */ + public static WhereClause prepareUpdateOperations(CFMetaData cfm, + WhereClause whereClause, + List<Pair<ColumnIdentifier.Raw, Operation.RawUpdate>> updates, + VariableSpecifications boundNames, + Operations operations) + { + assert cfm.isDense(); + Term.Raw superColumnKey = null; + Term.Raw superColumnValue = null; + + List<Relation> newRelations = new ArrayList<>(whereClause.relations.size()); + for (int i = 0; i < whereClause.relations.size(); i++) + { + SingleColumnRelation relation = (SingleColumnRelation) whereClause.relations.get(i); + ColumnIdentifier id = relation.getEntity().prepare(cfm); + ColumnDefinition def = cfm.getColumnDefinition(id); + + if (cfm.isSuperColumnKeyColumn(def)) + { + superColumnKey = relation.getValue(); + collectMarkerSpecifications(superColumnKey, boundNames, def); + } + else + { + newRelations.add(relation); + } + } + + checkTrue(superColumnKey != null, + "Column key is mandatory for SuperColumn tables"); + + for (Pair<ColumnIdentifier.Raw, Operation.RawUpdate> entry : updates) + { + ColumnIdentifier id = entry.left.prepare(cfm); + ColumnDefinition def = cfm.getColumnDefinition(id); + + if (!cfm.isSuperColumnValueColumn(def)) + throw invalidRequest("Column `%s` of type `%s` found in SET part", def.name, def.type.asCQL3Type()); + + Operation operation; + + if (entry.right instanceof Operation.Addition) + { + Operation.Addition op = (Operation.Addition) entry.right; + superColumnValue = op.value(); + + operation = new Operation.ElementAddition(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn()); + } + else if (entry.right instanceof Operation.Substraction) + { + Operation.Substraction op = (Operation.Substraction) entry.right; + superColumnValue = op.value(); + + operation = new Operation.ElementSubtraction(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn()); + } + else if (entry.right instanceof Operation.SetValue) + { + Operation.SetValue op = (Operation.SetValue) entry.right; + superColumnValue = op.value(); + + operation = new Operation.SetElement(superColumnKey, superColumnValue).prepare(cfm.ksName, cfm.compactValueColumn()); + } + else + { + throw invalidRequest("Invalid operation `%s` on column `%s` of type `%s` found in SET part", entry.right, def.name, def.type.asCQL3Type()); + } + + collectMarkerSpecifications(superColumnValue, boundNames, def); + operations.add(operation); + } + + checkTrue(superColumnValue != null, + "Column value is mandatory for SuperColumn tables"); + + return newRelations.size() != whereClause.relations.size() ? whereClause.copy(newRelations) : whereClause; + } + + /** + * Rebuilds LWT conditions on SuperColumn _value_ column. + * + * Conditions have to be changed to correspond the internal representation of SuperColumn value, since it's not + * a separate column, but a value in a hidden compact value column. + */ + public static Conditions rebuildLWTColumnConditions(Conditions conditions, CFMetaData cfm, WhereClause whereClause) + { + if (conditions.isEmpty() || conditions.isIfExists() || conditions.isIfNotExists()) + return conditions; + + ColumnConditions.Builder builder = ColumnConditions.newBuilder(); + Collection<ColumnCondition> columnConditions = ((ColumnConditions) conditions).columnConditions(); + + Pair<ColumnDefinition, Relation> superColumnKeyRelation = SuperColumnCompatibility.getSuperColumnKeyRelation(whereClause.relations, cfm); + + checkNotNull(superColumnKeyRelation, + "Lightweight transactions on SuperColumn tables are only supported with supplied SuperColumn key"); + + for (ColumnCondition columnCondition : columnConditions) + { + checkTrue(cfm.isSuperColumnValueColumn(columnCondition.column), + "Lightweight transactions are only supported on the value column of SuperColumn tables"); + + Term.Raw value = superColumnKeyRelation.right.getValue(); + Term collectionElemnt = value instanceof AbstractMarker.Raw ? + new Constants.Marker(((AbstractMarker.Raw) value).bindIndex(), + superColumnKeyRelation.left) : + value.prepare(cfm.ksName, superColumnKeyRelation.left); + builder.add(ColumnCondition.condition(cfm.compactValueColumn(), + collectionElemnt, + columnCondition.value(), columnCondition.operator)); + } + + return builder.build(); + } + + /** + * Returns a relation on the SuperColumn key + */ + private static Pair<ColumnDefinition, Relation> getSuperColumnKeyRelation(List<Relation> relations, CFMetaData cfm) + { + for (int i = 0; i < relations.size(); i++) + { + SingleColumnRelation relation = (SingleColumnRelation) relations.get(i); + ColumnIdentifier id = relation.getEntity().prepare(cfm); + ColumnDefinition def = cfm.getColumnDefinition(id); + + if (cfm.isSuperColumnKeyColumn(def)) + return Pair.create(def, relation); + } + return null; + } + + /** + * For _dense_ SuperColumn Families. + * + * Delete, when the "regular" columns are present, have to be translated into + * deletion of value in the internal map by key. + * + * For example, delete such as: + * + * DELETE FROM ... WHERE key = 'pk' AND column1 = 'ck' AND column2 = 'mk' + * + * Will delete a value under 'mk' from the map, located in the row with clustering key 'ck' in the partition + * with key 'pk'. + */ + public static WhereClause prepareDeleteOperations(CFMetaData cfm, + WhereClause whereClause, + VariableSpecifications boundNames, + Operations operations) + { + assert cfm.isDense(); + List<Relation> newRelations = new ArrayList<>(whereClause.relations.size()); + + for (int i = 0; i < whereClause.relations.size(); i++) + { + Relation orig = whereClause.relations.get(i); + + checkFalse(orig.isMultiColumn(), + "Multi-column relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", orig); + checkFalse(orig.onToken(), + "Token relations cannot be used in WHERE clauses for UPDATE and DELETE statements: %s", orig); + + SingleColumnRelation relation = (SingleColumnRelation) orig; + ColumnIdentifier id = relation.getEntity().prepare(cfm); + ColumnDefinition def = cfm.getColumnDefinition(id); + + if (cfm.isSuperColumnKeyColumn(def)) + { + Term.Raw value = relation.getValue(); + + if (value instanceof AbstractMarker.Raw) + boundNames.add(((AbstractMarker.Raw) value).bindIndex(), def); + + Operation operation = new Maps.DiscarderByKey(cfm.compactValueColumn(), value.prepare(cfm.ksName, def)); + operations.add(operation); + } + else + { + newRelations.add(relation); + } + } + + return newRelations.size() != whereClause.relations.size() ? whereClause.copy(newRelations) : whereClause; + } + + /** + * Create a column name generator for SuperColumns + */ + public static CompactTables.DefaultNames columnNameGenerator(List<ColumnDefinition> partitionKeyColumns, + List<ColumnDefinition> clusteringColumns, + PartitionColumns partitionColumns) + { + Set<String> names = new HashSet<>(); + // If the clustering column was renamed, the supercolumn key's default nname still can't be `column1` (SuperColumn + // key renames are handled separately by looking up an existing column). + names.add("column1"); + for (ColumnDefinition columnDefinition: partitionKeyColumns) + names.add(columnDefinition.name.toString()); + for (ColumnDefinition columnDefinition: clusteringColumns) + names.add(columnDefinition.name.toString()); + for (ColumnDefinition columnDefinition: partitionColumns) + names.add(columnDefinition.name.toString()); + + return CompactTables.defaultNameGenerator(names); + } + + /** + * Find a SuperColumn key column if it's available (for example, when it was renamed) or create one with a default name. + */ + public static ColumnDefinition getSuperCfKeyColumn(CFMetaData cfm, List<ColumnDefinition> clusteringColumns, CompactTables.DefaultNames defaultNames) + { + assert cfm.isDense(); + + MapType mapType = (MapType) cfm.compactValueColumn().type; + // Pre CASSANDRA-12373 3.x-created supercolumn family + if (clusteringColumns.size() == 1) + { + // create a new one with a default name + ColumnIdentifier identifier = ColumnIdentifier.getInterned(defaultNames.defaultClusteringName(), true); + return new ColumnDefinition(cfm.ksName, cfm.cfName, identifier, mapType.getKeysType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR); + } + + // Upgrade path: table created in 2.x, handle pre-created columns and/or renames. + assert clusteringColumns.size() == 2 : clusteringColumns; + ColumnDefinition cd = clusteringColumns.get(1); + + assert cd.type.equals(mapType.getKeysType()) : cd.type + " != " + mapType.getKeysType(); + return new ColumnDefinition(cfm.ksName, cfm.cfName, cd.name, mapType.getKeysType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR); + } + + /** + * Find a SuperColumn value column if it's available (for example, when it was renamed) or create one with a default name. + */ + public static ColumnDefinition getSuperCfValueColumn(CFMetaData cfm, PartitionColumns partitionColumns, ColumnDefinition superCfKeyColumn, CompactTables.DefaultNames defaultNames) + { + assert cfm.isDense(); + + MapType mapType = (MapType) cfm.compactValueColumn().type; + for (ColumnDefinition def: partitionColumns.regulars) + { + if (!def.name.bytes.equals(SUPER_COLUMN_MAP_COLUMN) && def.type.equals(mapType.getValuesType()) && !def.equals(superCfKeyColumn)) + return def; + } + + ColumnIdentifier identifier = ColumnIdentifier.getInterned(defaultNames.defaultCompactValueName(), true); + return new ColumnDefinition(cfm.ksName, cfm.cfName, identifier, mapType.getValuesType(), ColumnDefinition.NO_POSITION, ColumnDefinition.Kind.REGULAR); + } + + /** + * SuperColumn key is stored in {@link CFMetaData#columnMetadata} as a clustering column (to make sure we can make + * a distinction between the SuperColumn key and SuperColumn value columns, especially when they have the same type + * and were renamed), but exposed as {@link CFMetaData#superCfKeyColumn} as a regular column to be compatible with + * the storage engine. + * + * This remapping is necessary to facilitate the column metadata part. + */ + public static ColumnDefinition getSuperCfSschemaRepresentation(ColumnDefinition superCfKeyColumn) + { + return new ColumnDefinition(superCfKeyColumn.ksName, superCfKeyColumn.cfName, superCfKeyColumn.name, superCfKeyColumn.type, 1, ColumnDefinition.Kind.CLUSTERING); + } + + public static boolean isSuperColumnMapColumn(ColumnDefinition column) + { + return column.isRegular() && column.name.bytes.equals(SuperColumnCompatibility.SUPER_COLUMN_MAP_COLUMN); + } + + public static ColumnDefinition getCompactValueColumn(PartitionColumns columns) + { + for (ColumnDefinition column : columns.regulars) + { + if (isSuperColumnMapColumn(column)) + return column; + } + throw new AssertionError("Invalid super column table definition, no 'dynamic' map column"); + } + + /** + * Restrictions are the trickiest part of the SuperColumn integration. + * See specific docs on each field. For the purpose of this doc, the "default" column names are used, + * `column2` and `value`. Detailed description and semantics of these fields can be found in this class' + * header comment. + */ + public static class SuperColumnRestrictions + { + /** + * Restrictions in the form of: + * ... AND (column1, column2) > ('value1', 1) + * Multi-column restrictions. `column1` will be handled normally by the clustering bounds, + * and `column2` value has to be "saved" and filtered out in `processPartition`, as there's no + * direct mapping of multi-column restrictions to clustering + cell path. The first row + * is special-cased to make sure the semantics of multi-column restrictions are preserved. + */ + private final SingleColumnRestriction.SuperColumnMultiSliceRestriction multiSliceRestriction; + + /** + * Restrictions in the form of: + * ... AND (column1, column2) = ('value1', 1) + * Multi-column restriction with EQ does have a direct mapping: `column1` will be handled + * normally by the clustering bounds, and the `column2` will be special-cased by the + * {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} as a collection path lookup. + */ + private final SingleColumnRestriction.SuperColumnMultiEQRestriction multiEQRestriction; + + /** + * Restrictions in the form of: + * ... AND column2 >= 5 + * For non-filtering cases (when the preceding clustering column and a partition key are + * restricted), will be handled in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} + * like an inclusive bounds lookup. + * + * For the restrictions taking a form of + * ... AND column2 > 5 + * (non-inclusive ones), the items that match `=` will be filtered out + * by {@link #processPartition(CFMetaData, Selection, RowIterator, Selection.ResultSetBuilder, int, SuperColumnRestrictions, QueryOptions)} + * + * Unfortunately, there are no good ways to do it other than here: + * {@link RowFilter} can't be used in this case, since the complex collection cells are not yet rows by that + * point. + * {@link ColumnFilter} (which is used for inclusive slices) can't be changed to support exclusive slices as it would + * require a protocol change in order to add a Kind. So exclusive slices are a combination of inclusive plus + * an ad-hoc filter. + */ + private final SingleColumnRestriction.SuperColumnKeySliceRestriction keySliceRestriction; + + /** + * Restrictions in the form of: + * ... AND column2 IN (1, 2, 3) + * For non-filtering cases (when the preceeding clustering column and a partition key are + * restricted), are handled in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} by + * adding multiple collection paths to the {@link ColumnFilter} + */ + private final SingleColumnRestriction.SuperColumnKeyINRestriction keyINRestriction; + + /** + * Restrictions in the form of: + * ... AND column2 = 1 + * For non-filtering cases (when the preceeding clustering column and a partition key are + * restricted), will be handled by converting the restriction to the column filter on + * the collection key in {@link #getColumnFilter(CFMetaData, QueryOptions, SuperColumnRestrictions)} + */ + private final SingleColumnRestriction.SuperColumnKeyEQRestriction keyEQRestriction; + + public SuperColumnRestrictions(Iterator<Restriction> restrictions) + { + // In order to keep the fields final, assignments have to be done outside the loop + SingleColumnRestriction.SuperColumnMultiSliceRestriction multiSliceRestriction = null; + SingleColumnRestriction.SuperColumnKeySliceRestriction keySliceRestriction = null; + SingleColumnRestriction.SuperColumnKeyINRestriction keyINRestriction = null; + SingleColumnRestriction.SuperColumnMultiEQRestriction multiEQRestriction = null; + SingleColumnRestriction.SuperColumnKeyEQRestriction keyEQRestriction = null; + + while (restrictions.hasNext()) + { + Restriction restriction = restrictions.next(); + + if (restriction instanceof SingleColumnRestriction.SuperColumnMultiSliceRestriction) + multiSliceRestriction = (SingleColumnRestriction.SuperColumnMultiSliceRestriction) restriction; + else if (restriction instanceof SingleColumnRestriction.SuperColumnKeySliceRestriction) + keySliceRestriction = (SingleColumnRestriction.SuperColumnKeySliceRestriction) restriction; + else if (restriction instanceof SingleColumnRestriction.SuperColumnKeyINRestriction) + keyINRestriction = (SingleColumnRestriction.SuperColumnKeyINRestriction) restriction; + else if (restriction instanceof SingleColumnRestriction.SuperColumnMultiEQRestriction) + multiEQRestriction = (SingleColumnRestriction.SuperColumnMultiEQRestriction) restriction; + else if (restriction instanceof SingleColumnRestriction.SuperColumnKeyEQRestriction) + keyEQRestriction = (SingleColumnRestriction.SuperColumnKeyEQRestriction) restriction; + } + + this.multiSliceRestriction = multiSliceRestriction; + this.keySliceRestriction = keySliceRestriction; + this.keyINRestriction = keyINRestriction; + this.multiEQRestriction = multiEQRestriction; + this.keyEQRestriction = keyEQRestriction; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/UpdateParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/UpdateParameters.java b/src/java/org/apache/cassandra/cql3/UpdateParameters.java index d070f61..7d09506 100644 --- a/src/java/org/apache/cassandra/cql3/UpdateParameters.java +++ b/src/java/org/apache/cassandra/cql3/UpdateParameters.java @@ -125,7 +125,7 @@ public class UpdateParameters // the "compact" one. As such, deleting the row or deleting that single cell is equivalent. We favor the later however // because that makes it easier when translating back to the old format layout (for thrift and pre-3.0 backward // compatibility) as we don't have to special case for the row deletion. This is also in line with what we used to do pre-3.0. - if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING) + if (metadata.isCompactTable() && builder.clustering() != Clustering.STATIC_CLUSTERING && !metadata.isSuper()) addTombstone(metadata.compactValueColumn()); else builder.addRowDeletion(Row.Deletion.regular(deletionTime)); @@ -156,6 +156,11 @@ public class UpdateParameters public void addCounter(ColumnDefinition column, long increment) throws InvalidRequestException { + addCounter(column, increment, null); + } + + public void addCounter(ColumnDefinition column, long increment, CellPath path) throws InvalidRequestException + { assert ttl == LivenessInfo.NO_TTL; // Because column is a counter, we need the value to be a CounterContext. However, we're only creating a @@ -170,7 +175,7 @@ public class UpdateParameters // // We set counterid to a special value to differentiate between regular pre-2.0 local shards from pre-2.1 era // and "counter update" temporary state cells. Please see CounterContext.createUpdate() for further details. - builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createUpdate(increment))); + builder.addCell(BufferCell.live(metadata, column, timestamp, CounterContext.instance().createUpdate(increment), path)); } public void setComplexDeletionTime(ColumnDefinition column) http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/WhereClause.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/WhereClause.java b/src/java/org/apache/cassandra/cql3/WhereClause.java index 9d4e51a..c56c8e0 100644 --- a/src/java/org/apache/cassandra/cql3/WhereClause.java +++ b/src/java/org/apache/cassandra/cql3/WhereClause.java @@ -34,9 +34,13 @@ public final class WhereClause private WhereClause(Builder builder) { - this.relations = builder.relations.build(); - this.expressions = builder.expressions.build(); + this(builder.relations.build(), builder.expressions.build()); + } + private WhereClause(List<Relation> relations, List<CustomIndexExpression> expressions) + { + this.relations = relations; + this.expressions = expressions; } public static WhereClause empty() @@ -44,6 +48,11 @@ public final class WhereClause return EMPTY; } + public WhereClause copy(List<Relation> newRelations) + { + return new WhereClause(newRelations, expressions); + } + public boolean containsCustomExpressions() { return !expressions.isEmpty(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/ce8c9b55/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java index a5f4a24..860d3f0 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/PrimaryKeyRestrictionSet.java @@ -31,8 +31,6 @@ import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.SecondaryIndexManager; import org.apache.cassandra.utils.btree.BTreeSet; -import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; - /** * A set of single column restrictions on a primary key part (partition key or clustering key). */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
