Implement virtual keyspace interface patch by Benjamin Lehrer, Chris Lohfink, and Aleksey Yeschenko for CASSANDRA-7622
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d464cd2 Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d464cd2 Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d464cd2 Branch: refs/heads/trunk Commit: 0d464cd25ffbb5734f96c3082f9cc35011de3667 Parents: 3b6c938 Author: Chris Lohfink <clohf...@apple.com> Authored: Wed May 16 23:07:04 2018 +0100 Committer: Aleksey Yeshchenko <alek...@apple.com> Committed: Fri May 18 16:45:45 2018 +0100 ---------------------------------------------------------------------- CHANGES.txt | 1 + bin/cqlsh.py | 84 ++++- .../ClusteringColumnRestrictions.java | 8 +- .../restrictions/MultiColumnRestriction.java | 14 +- .../PartitionKeySingleRestrictionSet.java | 6 +- .../cql3/restrictions/Restriction.java | 10 +- .../cql3/restrictions/RestrictionSet.java | 10 +- .../restrictions/RestrictionSetWrapper.java | 15 +- .../restrictions/SingleColumnRestriction.java | 24 +- .../restrictions/StatementRestrictions.java | 29 +- .../cql3/restrictions/TokenFilter.java | 10 +- .../cql3/restrictions/TokenRestriction.java | 6 +- .../cql3/statements/AlterKeyspaceStatement.java | 2 + .../cql3/statements/AlterTableStatement.java | 3 + .../cql3/statements/BatchStatement.java | 56 ++- .../cql3/statements/BatchUpdatesCollector.java | 58 ++- .../cql3/statements/CQL3CasRequest.java | 21 +- .../statements/CreateAggregateStatement.java | 7 +- .../statements/CreateFunctionStatement.java | 6 +- .../cql3/statements/CreateIndexStatement.java | 3 + .../cql3/statements/CreateTableStatement.java | 5 +- .../cql3/statements/CreateTriggerStatement.java | 2 + .../cql3/statements/CreateTypeStatement.java | 2 + .../cql3/statements/CreateViewStatement.java | 4 +- .../cql3/statements/DeleteStatement.java | 2 + .../cql3/statements/DropKeyspaceStatement.java | 4 + .../cql3/statements/DropTableStatement.java | 3 + .../cql3/statements/ModificationStatement.java | 12 +- .../cql3/statements/SelectStatement.java | 31 +- .../statements/SingleTableUpdatesCollector.java | 8 +- .../cql3/statements/TruncateStatement.java | 10 + .../apache/cassandra/db/AbstractReadQuery.java | 116 ++++++ src/java/org/apache/cassandra/db/Keyspace.java | 2 + .../cassandra/db/PartitionRangeReadCommand.java | 38 +- .../cassandra/db/PartitionRangeReadQuery.java | 93 +++++ .../org/apache/cassandra/db/ReadCommand.java | 100 +---- src/java/org/apache/cassandra/db/ReadQuery.java | 184 ++++++--- .../db/SinglePartitionReadCommand.java | 178 ++------- .../cassandra/db/SinglePartitionReadQuery.java | 290 +++++++++++++++ .../db/VirtualTablePartitionRangeReadQuery.java | 113 ++++++ .../cassandra/db/VirtualTableReadQuery.java | 65 ++++ .../VirtualTableSinglePartitionReadQuery.java | 194 ++++++++++ .../cassandra/db/filter/ColumnFilter.java | 2 + .../db/partitions/PartitionIterators.java | 10 +- .../db/partitions/PartitionUpdate.java | 3 +- .../db/virtual/AbstractVirtualTable.java | 221 +++++++++++ .../cassandra/db/virtual/SimpleDataSet.java | 191 ++++++++++ .../db/virtual/SystemViewsKeyspace.java | 32 ++ .../cassandra/db/virtual/VirtualKeyspace.java | 58 +++ .../db/virtual/VirtualKeyspaceRegistry.java | 77 ++++ .../cassandra/db/virtual/VirtualMutation.java | 111 ++++++ .../db/virtual/VirtualSchemaKeyspace.java | 149 ++++++++ .../cassandra/db/virtual/VirtualTable.java | 74 ++++ .../apache/cassandra/index/IndexRegistry.java | 69 ++++ .../index/internal/CassandraIndex.java | 1 + .../cassandra/schema/KeyspaceMetadata.java | 39 +- .../org/apache/cassandra/schema/Schema.java | 19 +- .../apache/cassandra/schema/SchemaKeyspace.java | 2 +- .../apache/cassandra/schema/TableMetadata.java | 54 +-- .../apache/cassandra/service/CASRequest.java | 4 +- .../cassandra/service/CassandraDaemon.java | 5 + .../apache/cassandra/service/ClientState.java | 5 + .../apache/cassandra/service/StorageProxy.java | 18 +- .../cassandra/service/StorageService.java | 18 +- .../service/pager/AbstractQueryPager.java | 30 +- .../service/pager/MultiPartitionPager.java | 46 +-- .../service/pager/PartitionRangeQueryPager.java | 51 ++- .../service/pager/SinglePartitionPager.java | 33 +- .../cassandra/cache/CacheProviderTest.java | 1 + .../org/apache/cassandra/cql3/CQLTester.java | 5 + .../validation/entities/VirtualTableTest.java | 372 +++++++++++++++++++ 71 files changed, 2936 insertions(+), 593 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 5657567..e7e5ecb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 4.0 + * Implement virtual keyspace interface (CASSANDRA-7622) * nodetool import cleanup and improvements (CASSANDRA-14417) * Bump jackson version to >= 2.9.5 (CASSANDRA-14427) * Allow nodetool toppartitions without specifying table (CASSANDRA-14360) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/bin/cqlsh.py ---------------------------------------------------------------------- diff --git a/bin/cqlsh.py b/bin/cqlsh.py index 3055110..e8e380f 100644 --- a/bin/cqlsh.py +++ b/bin/cqlsh.py @@ -437,6 +437,9 @@ class Shell(cmd.Cmd): shunted_query_out = None use_paging = True + # TODO remove after virtual tables are added to connection metadata + virtual_keyspaces = None + default_page_size = 100 def __init__(self, hostname, port, color=False, @@ -628,7 +631,10 @@ class Shell(cmd.Cmd): self.connection_versions = vers def get_keyspace_names(self): - return map(str, self.conn.metadata.keyspaces.keys()) + # TODO remove after virtual tables are added to connection metadata + if self.virtual_keyspaces is None: + self.init_virtual_keyspaces_meta() + return map(str, self.conn.metadata.keyspaces.keys() + self.virtual_keyspaces.keys()) def get_columnfamily_names(self, ksname=None): if ksname is None: @@ -692,9 +698,78 @@ class Shell(cmd.Cmd): return self.conn.metadata.partitioner def get_keyspace_meta(self, ksname): - if ksname not in self.conn.metadata.keyspaces: - raise KeyspaceNotFound('Keyspace %r not found.' % ksname) - return self.conn.metadata.keyspaces[ksname] + if ksname in self.conn.metadata.keyspaces: + return self.conn.metadata.keyspaces[ksname] + + # TODO remove after virtual tables are added to connection metadata + if self.virtual_keyspaces is None: + self.init_virtual_keyspaces_meta() + if ksname in self.virtual_keyspaces: + return self.virtual_keyspaces[ksname] + + raise KeyspaceNotFound('Keyspace %r not found.' % ksname) + + # TODO remove after virtual tables are added to connection metadata + def init_virtual_keyspaces_meta(self): + self.virtual_keyspaces = {} + for vkeyspace in self.fetch_virtual_keyspaces(): + self.virtual_keyspaces[vkeyspace.name] = vkeyspace + + # TODO remove after virtual tables are added to connection metadata + def fetch_virtual_keyspaces(self): + keyspaces = [] + + result = self.session.execute('SELECT keyspace_name FROM system_virtual_schema.keyspaces;') + for row in result: + name = row['keyspace_name'] + keyspace = KeyspaceMetadata(name, False, None, None) + tables = self.fetch_virtual_tables(name) + for table in tables: + keyspace.tables[table.name] = table + keyspaces.append(keyspace) + + return keyspaces + + # TODO remove after virtual tables are added to connection metadata + def fetch_virtual_tables(self, keyspace_name): + tables = [] + + result = self.session.execute("SELECT * FROM system_virtual_schema.tables WHERE keyspace_name = '{}';".format(keyspace_name)) + for row in result: + name = row['table_name'] + table = TableMetadata(keyspace_name, name) + self.fetch_virtual_columns(table) + tables.append(table) + + return tables + + # TODO remove after virtual tables are added to connection metadata + def fetch_virtual_columns(self, table): + result = self.session.execute("SELECT * FROM system_virtual_schema.columns WHERE keyspace_name = '{}' AND table_name = '{}';".format(table.keyspace_name, table.name)) + + partition_key_columns = [] + clustering_columns = [] + + for row in result: + name = row['column_name'] + cql_type = row['type'] + kind = row['kind'] + position = row['position'] + is_static = kind == 'static' + is_reversed = row['clustering_order'] == 'desc' + column = ColumnMetadata(table, name, cql_type, is_static, is_reversed) + table.columns[column.name] = column + + if kind == 'partition_key': + partition_key_columns.append((position, column)) + elif kind == 'clustering': + clustering_columns.append((position, column)) + + partition_key_columns.sort(key=lambda t: t[0]) + clustering_columns.sort(key=lambda t: t[0]) + + table.partition_key = map(lambda t: t[1], partition_key_columns) + table.clustering_key = map(lambda t: t[1], clustering_columns) def get_keyspaces(self): return self.conn.metadata.keyspaces.values() @@ -707,7 +782,6 @@ class Shell(cmd.Cmd): if ksname is None: ksname = self.current_keyspace ksmeta = self.get_keyspace_meta(ksname) - if tablename not in ksmeta.tables: if ksname == 'system_auth' and tablename in ['roles', 'role_permissions']: self.get_fake_auth_table_meta(ksname, tablename) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java index f537255..265d354 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java @@ -26,7 +26,7 @@ import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.utils.btree.BTreeSet; import static org.apache.cassandra.cql3.statements.RequestValidations.checkFalse; @@ -199,7 +199,7 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) throws InvalidRequestException { int position = 0; @@ -207,9 +207,9 @@ final class ClusteringColumnRestrictions extends RestrictionSetWrapper for (SingleRestriction restriction : restrictions) { // We ignore all the clustering columns that can be handled by slices. - if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexManager)) + if (handleInFilter(restriction, position) || restriction.hasSupportingIndex(indexRegistry)) { - restriction.addRowFilterTo(filter, indexManager, options); + restriction.addRowFilterTo(filter, indexRegistry, options); continue; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java index e1202a6..4c6ce2f 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java @@ -28,7 +28,7 @@ import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.db.MultiCBuilder; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.index.Index; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -111,9 +111,9 @@ public abstract class MultiColumnRestriction implements SingleRestriction } @Override - public final boolean hasSupportingIndex(SecondaryIndexManager indexManager) + public final boolean hasSupportingIndex(IndexRegistry indexRegistry) { - for (Index index : indexManager.listIndexes()) + for (Index index : indexRegistry.listIndexes()) if (isSupportedBy(index)) return true; return false; @@ -186,7 +186,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction } @Override - public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options) + public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { Tuples.Value t = ((Tuples.Value) value.bind(options)); List<ByteBuffer> values = t.getElements(); @@ -244,7 +244,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction @Override public final void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { throw invalidRequest("IN restrictions are not supported on indexed columns"); @@ -473,7 +473,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction @Override public final void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { throw invalidRequest("Multi-column slice restrictions cannot be used for filtering."); @@ -561,7 +561,7 @@ public abstract class MultiColumnRestriction implements SingleRestriction } @Override - public final void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexMananger, QueryOptions options) + public final void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java index ac589be..5bb3242 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java @@ -27,7 +27,7 @@ import org.apache.cassandra.db.ClusteringComparator; import org.apache.cassandra.db.ClusteringPrefix; import org.apache.cassandra.db.MultiCBuilder; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; /** * A set of single restrictions on the partition key. @@ -121,12 +121,12 @@ final class PartitionKeySingleRestrictionSet extends RestrictionSetWrapper imple @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { for (SingleRestriction restriction : restrictions) { - restriction.addRowFilterTo(filter, indexManager, options); + restriction.addRowFilterTo(filter, indexRegistry, options); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java index daace46..91dedad 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java @@ -23,7 +23,7 @@ import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; /** * <p>Implementation of this class must be immutable.</p> @@ -63,19 +63,19 @@ public interface Restriction /** * Check if the restriction is on indexed columns. * - * @param indexManager the index manager + * @param indexRegistry the index registry * @return <code>true</code> if the restriction is on indexed columns, <code>false</code> */ - public boolean hasSupportingIndex(SecondaryIndexManager indexManager); + public boolean hasSupportingIndex(IndexRegistry indexRegistry); /** * Adds to the specified row filter the expressions corresponding to this <code>Restriction</code>. * * @param filter the row filter to add expressions to - * @param indexManager the secondary index manager + * @param indexRegistry the index registry * @param options the query options */ public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java index 7c805ce..427c396 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java @@ -27,7 +27,7 @@ import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.ContainsRestriction; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.commons.lang3.builder.ToStringStyle; @@ -74,10 +74,10 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction> } @Override - public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) throws InvalidRequestException + public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) throws InvalidRequestException { for (Restriction restriction : restrictions.values()) - restriction.addRowFilterTo(filter, indexManager, options); + restriction.addRowFilterTo(filter, indexRegistry, options); } @Override @@ -184,11 +184,11 @@ final class RestrictionSet implements Restrictions, Iterable<SingleRestriction> } @Override - public final boolean hasSupportingIndex(SecondaryIndexManager indexManager) + public final boolean hasSupportingIndex(IndexRegistry indexRegistry) { for (Restriction restriction : restrictions.values()) { - if (restriction.hasSupportingIndex(indexManager)) + if (restriction.hasSupportingIndex(indexRegistry)) return true; } return false; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java index c310908..9803adc 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java @@ -20,13 +20,14 @@ package org.apache.cassandra.cql3.restrictions; import java.util.List; import java.util.Set; +import org.apache.commons.lang3.builder.ToStringBuilder; +import org.apache.commons.lang3.builder.ToStringStyle; + import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.cql3.QueryOptions; import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.db.filter.RowFilter; -import org.apache.cassandra.index.SecondaryIndexManager; -import org.apache.commons.lang3.builder.ToStringBuilder; -import org.apache.commons.lang3.builder.ToStringStyle; +import org.apache.cassandra.index.IndexRegistry; /** * A <code>RestrictionSet</code> wrapper that can be extended to allow to modify the <code>RestrictionSet</code> @@ -45,10 +46,10 @@ class RestrictionSetWrapper implements Restrictions } public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { - restrictions.addRowFilterTo(filter, indexManager, options); + restrictions.addRowFilterTo(filter, indexRegistry, options); } public List<ColumnMetadata> getColumnDefs() @@ -71,9 +72,9 @@ class RestrictionSetWrapper implements Restrictions return restrictions.size(); } - public boolean hasSupportingIndex(SecondaryIndexManager indexManager) + public boolean hasSupportingIndex(IndexRegistry indexRegistry) { - return restrictions.hasSupportingIndex(indexManager); + return restrictions.hasSupportingIndex(indexRegistry); } public ColumnMetadata getFirstColumn() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java index 44f95a8..1b3482b 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java @@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.db.MultiCBuilder; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.index.Index; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.Pair; @@ -71,9 +71,9 @@ public abstract class SingleColumnRestriction implements SingleRestriction } @Override - public boolean hasSupportingIndex(SecondaryIndexManager indexManager) + public boolean hasSupportingIndex(IndexRegistry indexRegistry) { - for (Index index : indexManager.listIndexes()) + for (Index index : indexRegistry.listIndexes()) if (isSupportedBy(index)) return true; @@ -151,7 +151,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { filter.add(columnDef, Operator.EQ, value.bindAndGet(options)); @@ -215,7 +215,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { throw invalidRequest("IN restrictions are not supported on indexed columns"); @@ -385,7 +385,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction } @Override - public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) + public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { for (Bound b : Bound.values()) if (hasBound(b)) @@ -475,7 +475,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction } @Override - public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) + public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { for (ByteBuffer value : bindAndGet(values, options)) filter.add(columnDef, Operator.CONTAINS, value); @@ -615,7 +615,7 @@ public abstract class SingleColumnRestriction implements SingleRestriction @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { throw new UnsupportedOperationException("Secondary indexes do not support IS NOT NULL restrictions"); @@ -691,16 +691,16 @@ public abstract class SingleColumnRestriction implements SingleRestriction @Override public void addRowFilterTo(RowFilter filter, - SecondaryIndexManager indexManager, + IndexRegistry indexRegistry, QueryOptions options) { Pair<Operator, ByteBuffer> operation = makeSpecific(value.bindAndGet(options)); // there must be a suitable INDEX for LIKE_XXX expressions RowFilter.SimpleExpression expression = filter.add(columnDef, operation.left, operation.right); - indexManager.getBestIndexFor(expression) - .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns", - expression)); + indexRegistry.getBestIndexFor(expression) + .orElseThrow(() -> invalidRequest("%s is only supported on properly indexed columns", + expression)); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java index 0240b6f..af1a964 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java @@ -32,7 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.dht.*; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.index.Index; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.btree.BTreeSet; @@ -133,14 +133,9 @@ public final class StatementRestrictions { this(type, table, allowFiltering); - ColumnFamilyStore cfs; - SecondaryIndexManager secondaryIndexManager = null; - + IndexRegistry indexRegistry = null; if (type.allowUseOfSecondaryIndices()) - { - cfs = Keyspace.open(table.keyspace).getColumnFamilyStore(table.name); - secondaryIndexManager = cfs.indexManager; - } + indexRegistry = IndexRegistry.obtain(table); /* * WHERE clause. For a given entity, rules are: @@ -165,7 +160,7 @@ public final class StatementRestrictions { Restriction restriction = relation.toRestriction(table, boundNames); - if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(secondaryIndexManager)) + if (!type.allowUseOfSecondaryIndices() || !restriction.hasSupportingIndex(indexRegistry)) throw new InvalidRequestException(String.format("LIKE restriction is only supported on properly " + "indexed columns. %s is not valid.", relation.toString())); @@ -186,13 +181,13 @@ public final class StatementRestrictions if (type.allowUseOfSecondaryIndices()) { if (whereClause.containsCustomExpressions()) - processCustomIndexExpressions(whereClause.expressions, boundNames, secondaryIndexManager); + processCustomIndexExpressions(whereClause.expressions, boundNames, indexRegistry); - hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager); + hasQueriableClusteringColumnIndex = clusteringColumnsRestrictions.hasSupportingIndex(indexRegistry); hasQueriableIndex = !filterRestrictions.getCustomIndexExpressions().isEmpty() || hasQueriableClusteringColumnIndex - || partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager) - || nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager); + || partitionKeyRestrictions.hasSupportingIndex(indexRegistry) + || nonPrimaryKeyRestrictions.hasSupportingIndex(indexRegistry); } // At this point, the select statement if fully constructed, but we still have a few things to validate @@ -564,7 +559,7 @@ public final class StatementRestrictions private void processCustomIndexExpressions(List<CustomIndexExpression> expressions, VariableSpecifications boundNames, - SecondaryIndexManager indexManager) + IndexRegistry indexRegistry) { if (expressions.size() > 1) throw new InvalidRequestException(IndexRestrictions.MULTIPLE_EXPRESSIONS); @@ -582,7 +577,7 @@ public final class StatementRestrictions if (!table.indexes.has(expression.targetIndex.getIdx())) throw IndexRestrictions.indexNotFound(expression.targetIndex, table); - Index index = indexManager.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get()); + Index index = indexRegistry.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get()); if (!index.getIndexMetadata().isCustom()) throw IndexRestrictions.nonCustomIndexInExpression(expression.targetIndex); @@ -596,14 +591,14 @@ public final class StatementRestrictions filterRestrictions.add(expression); } - public RowFilter getRowFilter(SecondaryIndexManager indexManager, QueryOptions options) + public RowFilter getRowFilter(IndexRegistry indexRegistry, QueryOptions options) { if (filterRestrictions.isEmpty()) return RowFilter.NONE; RowFilter filter = RowFilter.create(); for (Restrictions restrictions : filterRestrictions.getRestrictions()) - restrictions.addRowFilterTo(filter, indexManager, options); + restrictions.addRowFilterTo(filter, indexRegistry, options); for (CustomIndexExpression expression : filterRestrictions.getCustomIndexExpressions()) expression.addToRowFilter(filter, table, options); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java index a80b811..437b17c 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java @@ -34,7 +34,7 @@ import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import static org.apache.cassandra.cql3.statements.Bound.END; import static org.apache.cassandra.cql3.statements.Bound.START; @@ -272,15 +272,15 @@ final class TokenFilter implements PartitionKeyRestrictions } @Override - public boolean hasSupportingIndex(SecondaryIndexManager indexManager) + public boolean hasSupportingIndex(IndexRegistry indexRegistry) { - return restrictions.hasSupportingIndex(indexManager); + return restrictions.hasSupportingIndex(indexRegistry); } @Override - public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) + public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { - restrictions.addRowFilterTo(filter, indexManager, options); + restrictions.addRowFilterTo(filter, indexRegistry, options); } @Override http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java index 806974a..e71b177 100644 --- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java +++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java @@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.functions.Function; import org.apache.cassandra.cql3.statements.Bound; import org.apache.cassandra.db.filter.RowFilter; import org.apache.cassandra.exceptions.InvalidRequestException; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import static org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest; @@ -116,13 +116,13 @@ public abstract class TokenRestriction implements PartitionKeyRestrictions } @Override - public boolean hasSupportingIndex(SecondaryIndexManager secondaryIndexManager) + public boolean hasSupportingIndex(IndexRegistry indexRegistry) { return false; } @Override - public void addRowFilterTo(RowFilter filter, SecondaryIndexManager indexManager, QueryOptions options) + public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, QueryOptions options) { throw new UnsupportedOperationException("Index expression cannot be created for token restriction"); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java index e26910d..00d2b94 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java @@ -66,6 +66,8 @@ public class AlterKeyspaceStatement extends SchemaAlteringStatement throw new InvalidRequestException("Unknown keyspace " + name); if (SchemaConstants.isLocalSystemKeyspace(ksm.name)) throw new InvalidRequestException("Cannot alter system keyspace"); + if (ksm.isVirtual()) + throw new InvalidRequestException("Cannot alter virtual keyspaces"); attrs.validate(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java index b3aeb74..260c8fd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java @@ -88,6 +88,9 @@ public class AlterTableStatement extends SchemaAlteringStatement if (current.isView()) throw new InvalidRequestException("Cannot use ALTER TABLE on Materialized View"); + if (current.isVirtual()) + throw new InvalidRequestException("Cannot alter virtual tables"); + TableMetadata.Builder builder = current.unbuild(); ColumnIdentifier columnName = null; http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java index 2fcd867..a71c799 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java @@ -70,6 +70,8 @@ public class BatchStatement implements CQLStatement private final boolean updatesStaticRow; private final Attributes attrs; private final boolean hasConditions; + private final boolean updatesVirtualTables; + private static final Logger logger = LoggerFactory.getLogger(BatchStatement.class); private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch covering {} partitions detected " + @@ -103,11 +105,13 @@ public class BatchStatement implements CQLStatement RegularAndStaticColumns.Builder conditionBuilder = RegularAndStaticColumns.builder(); boolean updateRegular = false; boolean updateStatic = false; + boolean updatesVirtualTables = false; for (ModificationStatement stmt : statements) { regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns()); updateRegular |= stmt.updatesRegularRows(); + updatesVirtualTables |= stmt.isVirtual(); if (stmt.hasConditions()) { hasConditions = true; @@ -121,6 +125,7 @@ public class BatchStatement implements CQLStatement this.updatesRegularRows = updateRegular; this.updatesStaticRow = updateStatic; this.hasConditions = hasConditions; + this.updatesVirtualTables = updatesVirtualTables; } public Iterable<org.apache.cassandra.cql3.functions.Function> getFunctions() @@ -161,29 +166,46 @@ public class BatchStatement implements CQLStatement boolean hasCounters = false; boolean hasNonCounters = false; + boolean hasVirtualTables = false; + boolean hasRegularTables = false; + for (ModificationStatement statement : statements) { - if (timestampSet && statement.isCounter()) - throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters"); - if (timestampSet && statement.isTimestampSet()) throw new InvalidRequestException("Timestamp must be set either on BATCH or individual statements"); - if (isCounter() && !statement.isCounter()) - throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); - - if (isLogged() && statement.isCounter()) - throw new InvalidRequestException("Cannot include a counter statement in a logged batch"); - if (statement.isCounter()) hasCounters = true; else hasNonCounters = true; + + if (statement.isVirtual()) + hasVirtualTables = true; + else + hasRegularTables = true; } + if (timestampSet && hasCounters) + throw new InvalidRequestException("Cannot provide custom timestamp for a BATCH containing counters"); + + if (isCounter() && hasNonCounters) + throw new InvalidRequestException("Cannot include non-counter statement in a counter batch"); + if (hasCounters && hasNonCounters) throw new InvalidRequestException("Counter and non-counter mutations cannot exist in the same batch"); + if (isLogged() && hasCounters) + throw new InvalidRequestException("Cannot include a counter statement in a logged batch"); + + if (isLogged() && hasVirtualTables) + throw new InvalidRequestException("Cannot include a virtual table statement in a logged batch"); + + if (hasVirtualTables && hasRegularTables) + throw new InvalidRequestException("Mutations for virtual and regular tables cannot exist in the same batch"); + + if (hasConditions && hasVirtualTables) + throw new InvalidRequestException("Conditional BATCH statements cannot include mutations for virtual tables"); + if (hasConditions) { String ksName = null; @@ -353,7 +375,11 @@ public class BatchStatement implements CQLStatement if (hasConditions) return executeWithConditions(options, queryState, queryStartNanoTime); - executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); + if (updatesVirtualTables) + executeInternalWithoutCondition(queryState, options, queryStartNanoTime); + else + executeWithoutConditions(getMutations(options, local, now, queryStartNanoTime), options.getConsistency(), queryStartNanoTime); + return new ResultMessage.Void(); } @@ -482,16 +508,18 @@ public class BatchStatement implements CQLStatement public ResultMessage executeInternal(QueryState queryState, QueryOptions options) throws RequestValidationException, RequestExecutionException { + BatchQueryOptions batchOptions = BatchQueryOptions.withoutPerStatementVariables(options); + if (hasConditions) - return executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options), queryState); + return executeInternalWithConditions(batchOptions, queryState); - executeInternalWithoutCondition(queryState, options, System.nanoTime()); + executeInternalWithoutCondition(queryState, batchOptions, System.nanoTime()); return new ResultMessage.Void(); } - private ResultMessage executeInternalWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException + private ResultMessage executeInternalWithoutCondition(QueryState queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws RequestValidationException, RequestExecutionException { - for (IMutation mutation : getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, queryState.getTimestamp(), queryStartNanoTime)) + for (IMutation mutation : getMutations(batchOptions, true, queryState.getTimestamp(), queryStartNanoTime)) mutation.apply(); return null; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java index 9671b02..96d9f5a 100644 --- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java @@ -19,10 +19,10 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; -import java.util.stream.Collectors; import com.google.common.collect.ImmutableMap; +import org.apache.cassandra.db.virtual.VirtualMutation; import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; @@ -84,15 +84,20 @@ final class BatchUpdatesCollector implements UpdatesCollector private IMutationBuilder getMutationBuilder(TableMetadata metadata, DecoratedKey dk, ConsistencyLevel consistency) { - String ksName = metadata.keyspace; - IMutationBuilder mutationBuilder = keyspaceMap(ksName).get(dk.getKey()); - if (mutationBuilder == null) + return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k -> makeMutationBuilder(metadata, dk, consistency)); + } + + private IMutationBuilder makeMutationBuilder(TableMetadata metadata, DecoratedKey partitionKey, ConsistencyLevel cl) + { + if (metadata.isVirtual()) { - MutationBuilder builder = new MutationBuilder(ksName, dk); - mutationBuilder = metadata.isCounter() ? new CounterMutationBuilder(builder, consistency) : builder; - keyspaceMap(ksName).put(dk.getKey(), mutationBuilder); + return new VirtualMutationBuilder(metadata.keyspace, partitionKey); + } + else + { + MutationBuilder builder = new MutationBuilder(metadata.keyspace, partitionKey); + return metadata.isCounter() ? new CounterMutationBuilder(builder, cl) : builder; } - return mutationBuilder; } /** @@ -228,4 +233,41 @@ final class BatchUpdatesCollector implements UpdatesCollector return mutationBuilder.get(id); } } + + private static class VirtualMutationBuilder implements IMutationBuilder + { + private final String keyspaceName; + private final DecoratedKey partitionKey; + + private final HashMap<TableId, PartitionUpdate.Builder> modifications = new HashMap<>(); + + private VirtualMutationBuilder(String keyspaceName, DecoratedKey partitionKey) + { + this.keyspaceName = keyspaceName; + this.partitionKey = partitionKey; + } + + @Override + public VirtualMutationBuilder add(PartitionUpdate.Builder builder) + { + PartitionUpdate.Builder prev = modifications.put(builder.metadata().id, builder); + if (null != prev) + throw new IllegalStateException(); + return this; + } + + @Override + public VirtualMutation build() + { + ImmutableMap.Builder<TableId, PartitionUpdate> updates = new ImmutableMap.Builder<>(); + modifications.forEach((tableId, updateBuilder) -> updates.put(tableId, updateBuilder.build())); + return new VirtualMutation(keyspaceName, partitionKey, updates.build()); + } + + @Override + public PartitionUpdate.Builder get(TableId tableId) + { + return modifications.get(tableId); + } + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java index eb6aa70..7953c8b 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java +++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java @@ -22,6 +22,7 @@ import java.util.*; import com.google.common.collect.*; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.conditions.ColumnCondition; @@ -183,7 +184,7 @@ public class CQL3CasRequest implements CASRequest return conditionColumns; } - public SinglePartitionReadCommand readCommand(int nowInSec) + public SinglePartitionReadQuery readCommand(int nowInSec) { assert staticConditions != null || !conditions.isEmpty(); @@ -193,16 +194,16 @@ public class CQL3CasRequest implements CASRequest // With only a static condition, we still want to make the distinction between a non-existing partition and one // that exists (has some live data) but has not static content. So we query the first live row of the partition. if (conditions.isEmpty()) - return SinglePartitionReadCommand.create(metadata, - nowInSec, - columnFilter, - RowFilter.NONE, - DataLimits.cqlLimits(1), - key, - new ClusteringIndexSliceFilter(Slices.ALL, false)); + return SinglePartitionReadQuery.create(metadata, + nowInSec, + columnFilter, + RowFilter.NONE, + DataLimits.cqlLimits(1), + key, + new ClusteringIndexSliceFilter(Slices.ALL, false)); ClusteringIndexNamesFilter filter = new ClusteringIndexNamesFilter(conditions.navigableKeySet(), false); - return SinglePartitionReadCommand.create(metadata, nowInSec, key, columnFilter, filter); + return SinglePartitionReadQuery.create(metadata, nowInSec, key, columnFilter, filter); } /** @@ -244,7 +245,7 @@ public class CQL3CasRequest implements CASRequest upd.applyUpdates(current, updateBuilder); PartitionUpdate partitionUpdate = updateBuilder.build(); - Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate); + IndexRegistry.obtain(metadata).validate(partitionUpdate); return partitionUpdate; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java index d3ef599..e428087 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java @@ -26,6 +26,7 @@ import java.util.List; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.cql3.*; import org.apache.cassandra.cql3.functions.*; @@ -206,8 +207,12 @@ public final class CreateAggregateStatement extends SchemaAlteringStatement if (ifNotExists && orReplace) throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); - if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null) + + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace); + if (null == ksm) throw new InvalidRequestException(String.format("Cannot add aggregate '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace)); + if (ksm.isVirtual()) + throw new InvalidRequestException("Cannot create aggregates in virtual keyspaces"); } public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java index 1f0e703..c380991 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.*; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.cql3.CQL3Type; import org.apache.cassandra.cql3.ColumnIdentifier; @@ -136,8 +137,11 @@ public final class CreateFunctionStatement extends SchemaAlteringStatement if (ifNotExists && orReplace) throw new InvalidRequestException("Cannot use both 'OR REPLACE' and 'IF NOT EXISTS' directives"); - if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null) + KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(functionName.keyspace); + if (null == ksm) throw new InvalidRequestException(String.format("Cannot add function '%s' to non existing keyspace '%s'.", functionName.name, functionName.keyspace)); + if (ksm.isVirtual()) + throw new InvalidRequestException("Cannot create functions in virtual keyspaces"); } public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws RequestValidationException http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java index 9d0a714..778c4a3 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java @@ -82,6 +82,9 @@ public class CreateIndexStatement extends SchemaAlteringStatement { TableMetadata table = Schema.instance.validateTable(keyspace(), columnFamily()); + if (table.isVirtual()) + throw new InvalidRequestException("Secondary indexes are not supported on virtual tables"); + if (table.isCounter()) throw new InvalidRequestException("Secondary indexes are not supported on counter tables"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java index 55249c4..7c639e2 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java @@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements; import java.nio.ByteBuffer; import java.util.*; import java.util.regex.Pattern; + import com.google.common.collect.HashMultiset; import com.google.common.collect.Multiset; import org.apache.commons.lang3.StringUtils; @@ -134,7 +135,6 @@ public class CreateTableStatement extends SchemaAlteringStatement .isCompound(isCompound) .isCounter(hasCounters) .isSuper(false) - .isView(false) .params(params); for (int i = 0; i < keyAliases.size(); i++) @@ -213,6 +213,9 @@ public class CreateTableStatement extends SchemaAlteringStatement KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(keyspace()); if (ksm == null) throw new ConfigurationException(String.format("Keyspace %s doesn't exist", keyspace())); + if (ksm.isVirtual()) + throw new InvalidRequestException("Cannot create tables in virtual keyspaces"); + return prepare(ksm.types); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java index d57bff7..f2cd217 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java @@ -62,6 +62,8 @@ public class CreateTriggerStatement extends SchemaAlteringStatement public void validate(ClientState state) throws RequestValidationException { TableMetadata metadata = Schema.instance.validateTable(keyspace(), columnFamily()); + if (metadata.isVirtual()) + throw new InvalidRequestException("Cannot CREATE TRIGGER against a virtual table"); if (metadata.isView()) throw new InvalidRequestException("Cannot CREATE TRIGGER against a materialized view"); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java index 7dce478..1a0da4c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java @@ -73,6 +73,8 @@ public class CreateTypeStatement extends SchemaAlteringStatement KeyspaceMetadata ksm = Schema.instance.getKeyspaceMetadata(name.getKeyspace()); if (ksm == null) throw new InvalidRequestException(String.format("Cannot add type in unknown keyspace %s", name.getKeyspace())); + if (ksm.isVirtual()) + throw new InvalidRequestException("Cannot create types in virtual keyspaces"); if (ksm.types.get(name.getUserTypeName()).isPresent() && !ifNotExists) throw new InvalidRequestException(String.format("A user type of name %s already exists", name)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java index 01ed6fe..b50a552 100644 --- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java @@ -151,6 +151,8 @@ public class CreateViewStatement extends SchemaAlteringStatement TableMetadata metadata = Schema.instance.validateTable(baseName.getKeyspace(), baseName.getColumnFamily()); + if (metadata.isVirtual()) + throw new InvalidRequestException("Materialized views are not supported on virtual tables"); if (metadata.isCounter()) throw new InvalidRequestException("Materialized views are not supported on counter tables"); if (metadata.isView()) @@ -317,7 +319,7 @@ public class CreateViewStatement extends SchemaAlteringStatement TableMetadata.Builder builder = TableMetadata.builder(keyspace(), columnFamily(), properties.properties.getId()) - .isView(true) + .kind(TableMetadata.Kind.VIEW) .params(params); add(metadata, targetPartitionKeys, builder::addPartitionKeyColumn); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java index af04572..639286c 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java @@ -144,6 +144,8 @@ public class DeleteStatement extends ModificationStatement Conditions conditions, Attributes attrs) { + checkFalse(metadata.isVirtual(), "Virtual tables don't support DELETE statements"); + Operations operations = new Operations(type); for (Operation.RawDeletion deletion : deletions) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java index cfc6564..2f302c0 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java @@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements; import org.apache.cassandra.audit.AuditLogEntryType; import org.apache.cassandra.auth.Permission; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.RequestValidationException; @@ -61,6 +62,9 @@ public class DropKeyspaceStatement extends SchemaAlteringStatement public Event.SchemaChange announceMigration(QueryState queryState, boolean isLocalOnly) throws ConfigurationException { + if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspace)) + throw new InvalidRequestException("Cannot drop virtual keyspaces"); + try { MigrationManager.announceKeyspaceDrop(keyspace, isLocalOnly); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java index d7801e5..beb1002 100644 --- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java @@ -75,6 +75,9 @@ public class DropTableStatement extends SchemaAlteringStatement if (metadata.isView()) throw new InvalidRequestException("Cannot use DROP TABLE on Materialized View"); + if (metadata.isVirtual()) + throw new InvalidRequestException("Cannot drop virtual tables"); + boolean rejectDrop = false; StringBuilder messageBuilder = new StringBuilder(); for (ViewMetadata def : ksm.views) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java index 31aa80c..e02fd41 100644 --- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java @@ -203,6 +203,11 @@ public abstract class ModificationStatement implements CQLStatement return metadata().isView(); } + public boolean isVirtual() + { + return metadata().isVirtual(); + } + public long getTimestamp(long now, QueryOptions options) throws InvalidRequestException { return attrs.getTimestamp(now, options); @@ -248,6 +253,8 @@ public abstract class ModificationStatement implements CQLStatement checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide custom timestamp for counter updates"); checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide custom TTL for counter updates"); checkFalse(isView(), "Cannot directly modify a materialized view"); + checkFalse(isVirtual() && attrs.isTimeToLiveSet(), "Expiring columns are not supported by virtual tables"); + checkFalse(isVirtual() && hasConditions(), "Conditional updates are not supported by virtual tables"); } public RegularAndStaticColumns updatedColumns() @@ -440,6 +447,9 @@ public abstract class ModificationStatement implements CQLStatement private ResultMessage executeWithoutCondition(QueryState queryState, QueryOptions options, long queryStartNanoTime) throws RequestExecutionException, RequestValidationException { + if (isVirtual()) + return executeInternalWithoutCondition(queryState, options, queryStartNanoTime); + ConsistencyLevel cl = options.getConsistency(); if (isCounter()) cl.validateCounterForWrite(metadata()); @@ -613,7 +623,7 @@ public abstract class ModificationStatement implements CQLStatement { UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp()); - SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); + SinglePartitionReadQuery readCommand = request.readCommand(FBUtilities.nowInSeconds()); FilteredPartition current; try (ReadExecutionController executionController = readCommand.executionController(); PartitionIterator iter = readCommand.executeInternal(executionController)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java index 7fa9964..ef3db51 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java @@ -55,7 +55,7 @@ import org.apache.cassandra.db.rows.RowIterator; import org.apache.cassandra.db.view.View; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.exceptions.*; -import org.apache.cassandra.index.SecondaryIndexManager; +import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.serializers.MarshalException; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.ClientWarn; @@ -458,7 +458,7 @@ public class SelectStatement implements CQLStatement { QueryPager pager = query.getPager(options.getPagingState(), options.getProtocolVersion()); - if (aggregationSpec == null || query == ReadQuery.EMPTY) + if (aggregationSpec == null || query.isEmpty()) return pager; return new AggregationQueryPager(pager, query.limits()); @@ -501,25 +501,22 @@ public class SelectStatement implements CQLStatement { Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options); if (keys.isEmpty()) - return ReadQuery.EMPTY; + return ReadQuery.empty(table); ClusteringIndexFilter filter = makeClusteringIndexFilter(options, columnFilter); if (filter == null) - return ReadQuery.EMPTY; + return ReadQuery.empty(table); RowFilter rowFilter = getRowFilter(options); - // Note that we use the total limit for every key, which is potentially inefficient. - // However, IN + LIMIT is not a very sensible choice. - List<SinglePartitionReadCommand> commands = new ArrayList<>(keys.size()); + List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size()); for (ByteBuffer key : keys) { QueryProcessor.validateKey(key); - DecoratedKey dk = table.partitioner.decorateKey(ByteBufferUtil.clone(key)); - commands.add(SinglePartitionReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, dk, filter)); + decoratedKeys.add(table.partitioner.decorateKey(ByteBufferUtil.clone(key))); } - return new SinglePartitionReadCommand.Group(commands, limit); + return SinglePartitionReadQuery.createGroup(table, nowInSec, columnFilter, rowFilter, limit, decoratedKeys, filter); } /** @@ -569,7 +566,7 @@ public class SelectStatement implements CQLStatement { ClusteringIndexFilter clusteringIndexFilter = makeClusteringIndexFilter(options, columnFilter); if (clusteringIndexFilter == null) - return ReadQuery.EMPTY; + return ReadQuery.empty(table); RowFilter rowFilter = getRowFilter(options); @@ -577,10 +574,10 @@ public class SelectStatement implements CQLStatement // We want to have getRangeSlice to count the number of columns, not the number of keys. AbstractBounds<PartitionPosition> keyBounds = restrictions.getPartitionKeyBounds(options); if (keyBounds == null) - return ReadQuery.EMPTY; + return ReadQuery.empty(table); - PartitionRangeReadCommand command = - PartitionRangeReadCommand.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); + ReadQuery command = + PartitionRangeReadQuery.create(table, nowInSec, columnFilter, rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter)); // If there's a secondary index that the command can use, have it validate the request parameters. command.maybeValidateIndex(); @@ -755,10 +752,8 @@ public class SelectStatement implements CQLStatement */ public RowFilter getRowFilter(QueryOptions options) throws InvalidRequestException { - ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily()); - SecondaryIndexManager secondaryIndexManager = cfs.indexManager; - RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, options); - return filter; + IndexRegistry indexRegistry = IndexRegistry.obtain(table); + return restrictions.getRowFilter(indexRegistry, options); } private ResultSet process(PartitionIterator partitions, http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java index 9eaf897..1def3fd 100644 --- a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java +++ b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java @@ -31,6 +31,7 @@ import org.apache.cassandra.db.IMutation; import org.apache.cassandra.db.Mutation; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.virtual.VirtualMutation; import org.apache.cassandra.schema.TableMetadata; /** @@ -86,12 +87,15 @@ final class SingleTableUpdatesCollector implements UpdatesCollector List<IMutation> ms = new ArrayList<>(); for (PartitionUpdate.Builder builder : puBuilders.values()) { - IMutation mutation = null; + IMutation mutation; - if (metadata.isCounter()) + if (metadata.isVirtual()) + mutation = new VirtualMutation(builder.build()); + else if (metadata.isCounter()) mutation = new CounterMutation(new Mutation(builder.build()), counterConsistencyLevel); else mutation = new Mutation(builder.build()); + mutation.validateIndexedColumns(); ms.add(mutation); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java index 5d09cfa..d41a814 100644 --- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java +++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java @@ -69,6 +69,9 @@ public class TruncateStatement extends CFStatement implements CQLStatement if (metaData.isView()) throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead"); + if (metaData.isVirtual()) + throw new InvalidRequestException("Cannot truncate virtual tables"); + StorageProxy.truncateBlocking(keyspace(), columnFamily()); } catch (UnavailableException | TimeoutException e) @@ -82,6 +85,13 @@ public class TruncateStatement extends CFStatement implements CQLStatement { try { + TableMetadata metaData = Schema.instance.getTableMetadata(keyspace(), columnFamily()); + if (metaData.isView()) + throw new InvalidRequestException("Cannot TRUNCATE materialized view directly; must truncate base table instead"); + + if (metaData.isVirtual()) + throw new InvalidRequestException("Cannot truncate virtual tables"); + ColumnFamilyStore cfs = Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily()); cfs.truncateBlocking(); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/AbstractReadQuery.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java b/src/java/org/apache/cassandra/db/AbstractReadQuery.java new file mode 100644 index 0000000..c6ec329 --- /dev/null +++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db; + +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.monitoring.MonitorableImpl; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators; +import org.apache.cassandra.schema.TableMetadata; + +/** + * Base class for {@code ReadQuery} implementations. + */ +abstract class AbstractReadQuery extends MonitorableImpl implements ReadQuery +{ + private final TableMetadata metadata; + private final int nowInSec; + + private final ColumnFilter columnFilter; + private final RowFilter rowFilter; + private final DataLimits limits; + + protected AbstractReadQuery(TableMetadata metadata, int nowInSec, ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits) + { + this.metadata = metadata; + this.nowInSec = nowInSec; + this.columnFilter = columnFilter; + this.rowFilter = rowFilter; + this.limits = limits; + } + + @Override + public TableMetadata metadata() + { + return metadata; + } + + // Monitorable interface + public String name() + { + return toCQLString(); + } + + @Override + public PartitionIterator executeInternal(ReadExecutionController controller) + { + return UnfilteredPartitionIterators.filter(executeLocally(controller), nowInSec()); + } + + @Override + public DataLimits limits() + { + return limits; + } + + @Override + public int nowInSec() + { + return nowInSec; + } + + @Override + public RowFilter rowFilter() + { + return rowFilter; + } + + @Override + public ColumnFilter columnFilter() + { + return columnFilter; + } + + /** + * Recreate the CQL string corresponding to this query. + * <p> + * Note that in general the returned string will not be exactly the original user string, first + * because there isn't always a single syntax for a given query, but also because we don't have + * all the information needed (we know the non-PK columns queried but not the PK ones as internally + * we query them all). So this shouldn't be relied too strongly, but this should be good enough for + * debugging purpose which is what this is for. + */ + public String toCQLString() + { + StringBuilder sb = new StringBuilder().append("SELECT ") + .append(columnFilter()) + .append(" FROM ") + .append(metadata().keyspace) + .append('.') + .append(metadata().name); + appendCQLWhereClause(sb); + + if (limits() != DataLimits.NONE) + sb.append(' ').append(limits()); + return sb.toString(); + } + + protected abstract void appendCQLWhereClause(StringBuilder sb); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/Keyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Keyspace.java b/src/java/org/apache/cassandra/db/Keyspace.java index 651d156..50720f4 100644 --- a/src/java/org/apache/cassandra/db/Keyspace.java +++ b/src/java/org/apache/cassandra/db/Keyspace.java @@ -327,6 +327,8 @@ public class Keyspace { metadata = Schema.instance.getKeyspaceMetadata(keyspaceName); assert metadata != null : "Unknown keyspace " + keyspaceName; + if (metadata.isVirtual()) + throw new IllegalStateException("Cannot initialize Keyspace with virtual metadata " + keyspaceName); createReplicationStrategy(metadata); this.metric = new KeyspaceMetrics(this); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 27d7d4e..a6641d4 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -45,15 +45,13 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.IndexMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.StorageProxy; -import org.apache.cassandra.service.pager.*; import org.apache.cassandra.tracing.Tracing; -import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.FBUtilities; /** * A read command that selects a (part of a) range of partitions. */ -public class PartitionRangeReadCommand extends ReadCommand +public class PartitionRangeReadCommand extends ReadCommand implements PartitionRangeReadQuery { protected static final SelectionDeserializer selectionDeserializer = new Deserializer(); @@ -187,7 +185,8 @@ public class PartitionRangeReadCommand extends ReadCommand indexMetadata()); } - public ReadCommand withUpdatedLimit(DataLimits newLimits) + @Override + public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits) { return new PartitionRangeReadCommand(isDigestQuery(), digestVersion(), @@ -200,6 +199,7 @@ public class PartitionRangeReadCommand extends ReadCommand indexMetadata()); } + @Override public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits newLimits, DataRange newDataRange) { return new PartitionRangeReadCommand(isDigestQuery(), @@ -218,34 +218,11 @@ public class PartitionRangeReadCommand extends ReadCommand return DatabaseDescriptor.getRangeRpcTimeout(); } - public boolean selectsKey(DecoratedKey key) - { - if (!dataRange().contains(key)) - return false; - - return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, metadata().partitionKeyType); - } - - public boolean selectsClustering(DecoratedKey key, Clustering clustering) - { - if (clustering == Clustering.STATIC_CLUSTERING) - return !columnFilter().fetchedColumns().statics.isEmpty(); - - if (!dataRange().clusteringIndexFilter(key).selects(clustering)) - return false; - return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering); - } - public PartitionIterator execute(ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) throws RequestExecutionException { return StorageProxy.getRangeSlice(this, consistency, queryStartNanoTime); } - public QueryPager getPager(PagingState pagingState, ProtocolVersion protocolVersion) - { - return new PartitionRangeQueryPager(this, pagingState, protocolVersion); - } - protected void recordLatency(TableMetrics metric, long latencyNanos) { metric.rangeLatency.addNano(latencyNanos); @@ -389,13 +366,6 @@ public class PartitionRangeReadCommand extends ReadCommand } @Override - public boolean selectsFullPartition() - { - return metadata().isStaticCompactTable() || - (dataRange.selectsAllPartition() && !rowFilter().hasExpressionOnClusteringOrRegularColumns()); - } - - @Override public String toString() { return String.format("Read(%s columns=%s rowfilter=%s limits=%s %s)", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org