Implement virtual keyspace interface

patch by Benjamin Lehrer, Chris Lohfink, and Aleksey Yeschenko for
CASSANDRA-7622


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/0d464cd2
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/0d464cd2
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/0d464cd2

Branch: refs/heads/trunk
Commit: 0d464cd25ffbb5734f96c3082f9cc35011de3667
Parents: 3b6c938
Author: Chris Lohfink <clohf...@apple.com>
Authored: Wed May 16 23:07:04 2018 +0100
Committer: Aleksey Yeshchenko <alek...@apple.com>
Committed: Fri May 18 16:45:45 2018 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 bin/cqlsh.py                                    |  84 ++++-
 .../ClusteringColumnRestrictions.java           |   8 +-
 .../restrictions/MultiColumnRestriction.java    |  14 +-
 .../PartitionKeySingleRestrictionSet.java       |   6 +-
 .../cql3/restrictions/Restriction.java          |  10 +-
 .../cql3/restrictions/RestrictionSet.java       |  10 +-
 .../restrictions/RestrictionSetWrapper.java     |  15 +-
 .../restrictions/SingleColumnRestriction.java   |  24 +-
 .../restrictions/StatementRestrictions.java     |  29 +-
 .../cql3/restrictions/TokenFilter.java          |  10 +-
 .../cql3/restrictions/TokenRestriction.java     |   6 +-
 .../cql3/statements/AlterKeyspaceStatement.java |   2 +
 .../cql3/statements/AlterTableStatement.java    |   3 +
 .../cql3/statements/BatchStatement.java         |  56 ++-
 .../cql3/statements/BatchUpdatesCollector.java  |  58 ++-
 .../cql3/statements/CQL3CasRequest.java         |  21 +-
 .../statements/CreateAggregateStatement.java    |   7 +-
 .../statements/CreateFunctionStatement.java     |   6 +-
 .../cql3/statements/CreateIndexStatement.java   |   3 +
 .../cql3/statements/CreateTableStatement.java   |   5 +-
 .../cql3/statements/CreateTriggerStatement.java |   2 +
 .../cql3/statements/CreateTypeStatement.java    |   2 +
 .../cql3/statements/CreateViewStatement.java    |   4 +-
 .../cql3/statements/DeleteStatement.java        |   2 +
 .../cql3/statements/DropKeyspaceStatement.java  |   4 +
 .../cql3/statements/DropTableStatement.java     |   3 +
 .../cql3/statements/ModificationStatement.java  |  12 +-
 .../cql3/statements/SelectStatement.java        |  31 +-
 .../statements/SingleTableUpdatesCollector.java |   8 +-
 .../cql3/statements/TruncateStatement.java      |  10 +
 .../apache/cassandra/db/AbstractReadQuery.java  | 116 ++++++
 src/java/org/apache/cassandra/db/Keyspace.java  |   2 +
 .../cassandra/db/PartitionRangeReadCommand.java |  38 +-
 .../cassandra/db/PartitionRangeReadQuery.java   |  93 +++++
 .../org/apache/cassandra/db/ReadCommand.java    | 100 +----
 src/java/org/apache/cassandra/db/ReadQuery.java | 184 ++++++---
 .../db/SinglePartitionReadCommand.java          | 178 ++-------
 .../cassandra/db/SinglePartitionReadQuery.java  | 290 +++++++++++++++
 .../db/VirtualTablePartitionRangeReadQuery.java | 113 ++++++
 .../cassandra/db/VirtualTableReadQuery.java     |  65 ++++
 .../VirtualTableSinglePartitionReadQuery.java   | 194 ++++++++++
 .../cassandra/db/filter/ColumnFilter.java       |   2 +
 .../db/partitions/PartitionIterators.java       |  10 +-
 .../db/partitions/PartitionUpdate.java          |   3 +-
 .../db/virtual/AbstractVirtualTable.java        | 221 +++++++++++
 .../cassandra/db/virtual/SimpleDataSet.java     | 191 ++++++++++
 .../db/virtual/SystemViewsKeyspace.java         |  32 ++
 .../cassandra/db/virtual/VirtualKeyspace.java   |  58 +++
 .../db/virtual/VirtualKeyspaceRegistry.java     |  77 ++++
 .../cassandra/db/virtual/VirtualMutation.java   | 111 ++++++
 .../db/virtual/VirtualSchemaKeyspace.java       | 149 ++++++++
 .../cassandra/db/virtual/VirtualTable.java      |  74 ++++
 .../apache/cassandra/index/IndexRegistry.java   |  69 ++++
 .../index/internal/CassandraIndex.java          |   1 +
 .../cassandra/schema/KeyspaceMetadata.java      |  39 +-
 .../org/apache/cassandra/schema/Schema.java     |  19 +-
 .../apache/cassandra/schema/SchemaKeyspace.java |   2 +-
 .../apache/cassandra/schema/TableMetadata.java  |  54 +--
 .../apache/cassandra/service/CASRequest.java    |   4 +-
 .../cassandra/service/CassandraDaemon.java      |   5 +
 .../apache/cassandra/service/ClientState.java   |   5 +
 .../apache/cassandra/service/StorageProxy.java  |  18 +-
 .../cassandra/service/StorageService.java       |  18 +-
 .../service/pager/AbstractQueryPager.java       |  30 +-
 .../service/pager/MultiPartitionPager.java      |  46 +--
 .../service/pager/PartitionRangeQueryPager.java |  51 ++-
 .../service/pager/SinglePartitionPager.java     |  33 +-
 .../cassandra/cache/CacheProviderTest.java      |   1 +
 .../org/apache/cassandra/cql3/CQLTester.java    |   5 +
 .../validation/entities/VirtualTableTest.java   | 372 +++++++++++++++++++
 71 files changed, 2936 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5657567..e7e5ecb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0
+ * Implement virtual keyspace interface (CASSANDRA-7622)
  * nodetool import cleanup and improvements (CASSANDRA-14417)
  * Bump jackson version to >= 2.9.5 (CASSANDRA-14427)
  * Allow nodetool toppartitions without specifying table (CASSANDRA-14360)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/bin/cqlsh.py
----------------------------------------------------------------------
diff --git a/bin/cqlsh.py b/bin/cqlsh.py
index 3055110..e8e380f 100644
--- a/bin/cqlsh.py
+++ b/bin/cqlsh.py
@@ -437,6 +437,9 @@ class Shell(cmd.Cmd):
     shunted_query_out = None
     use_paging = True
 
+    # TODO remove after virtual tables are added to connection metadata
+    virtual_keyspaces = None
+
     default_page_size = 100
 
     def __init__(self, hostname, port, color=False,
@@ -628,7 +631,10 @@ class Shell(cmd.Cmd):
         self.connection_versions = vers
 
     def get_keyspace_names(self):
-        return map(str, self.conn.metadata.keyspaces.keys())
+        # TODO remove after virtual tables are added to connection metadata
+        if self.virtual_keyspaces is None:
+            self.init_virtual_keyspaces_meta()
+        return map(str, self.conn.metadata.keyspaces.keys() + 
self.virtual_keyspaces.keys())
 
     def get_columnfamily_names(self, ksname=None):
         if ksname is None:
@@ -692,9 +698,78 @@ class Shell(cmd.Cmd):
         return self.conn.metadata.partitioner
 
     def get_keyspace_meta(self, ksname):
-        if ksname not in self.conn.metadata.keyspaces:
-            raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
-        return self.conn.metadata.keyspaces[ksname]
+        if ksname in self.conn.metadata.keyspaces:
+            return self.conn.metadata.keyspaces[ksname]
+
+        # TODO remove after virtual tables are added to connection metadata
+        if self.virtual_keyspaces is None:
+            self.init_virtual_keyspaces_meta()
+        if ksname in self.virtual_keyspaces:
+            return self.virtual_keyspaces[ksname]
+
+        raise KeyspaceNotFound('Keyspace %r not found.' % ksname)
+
+    # TODO remove after virtual tables are added to connection metadata
+    def init_virtual_keyspaces_meta(self):
+        self.virtual_keyspaces = {}
+        for vkeyspace in self.fetch_virtual_keyspaces():
+            self.virtual_keyspaces[vkeyspace.name] = vkeyspace
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_keyspaces(self):
+        keyspaces = []
+
+        result = self.session.execute('SELECT keyspace_name FROM 
system_virtual_schema.keyspaces;')
+        for row in result:
+            name = row['keyspace_name']
+            keyspace = KeyspaceMetadata(name, False, None, None)
+            tables = self.fetch_virtual_tables(name)
+            for table in tables:
+                keyspace.tables[table.name] = table
+            keyspaces.append(keyspace)
+
+        return keyspaces
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_tables(self, keyspace_name):
+        tables = []
+
+        result = self.session.execute("SELECT * FROM 
system_virtual_schema.tables WHERE keyspace_name = '{}';".format(keyspace_name))
+        for row in result:
+            name = row['table_name']
+            table = TableMetadata(keyspace_name, name)
+            self.fetch_virtual_columns(table)
+            tables.append(table)
+
+        return tables
+
+    # TODO remove after virtual tables are added to connection metadata
+    def fetch_virtual_columns(self, table):
+        result = self.session.execute("SELECT * FROM 
system_virtual_schema.columns WHERE keyspace_name = '{}' AND table_name = 
'{}';".format(table.keyspace_name, table.name))
+
+        partition_key_columns = []
+        clustering_columns = []
+
+        for row in result:
+            name = row['column_name']
+            cql_type = row['type']
+            kind = row['kind']
+            position = row['position']
+            is_static = kind == 'static'
+            is_reversed = row['clustering_order'] == 'desc'
+            column = ColumnMetadata(table, name, cql_type, is_static, 
is_reversed)
+            table.columns[column.name] = column
+
+            if kind == 'partition_key':
+                partition_key_columns.append((position, column))
+            elif kind == 'clustering':
+                clustering_columns.append((position, column))
+
+        partition_key_columns.sort(key=lambda t: t[0])
+        clustering_columns.sort(key=lambda t: t[0])
+
+        table.partition_key  = map(lambda t: t[1], partition_key_columns)
+        table.clustering_key = map(lambda t: t[1], clustering_columns)
 
     def get_keyspaces(self):
         return self.conn.metadata.keyspaces.values()
@@ -707,7 +782,6 @@ class Shell(cmd.Cmd):
         if ksname is None:
             ksname = self.current_keyspace
         ksmeta = self.get_keyspace_meta(ksname)
-
         if tablename not in ksmeta.tables:
             if ksname == 'system_auth' and tablename in ['roles', 
'role_permissions']:
                 self.get_fake_auth_table_meta(ksname, tablename)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
 
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
index f537255..265d354 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/ClusteringColumnRestrictions.java
@@ -26,7 +26,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.*;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.utils.btree.BTreeSet;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.checkFalse;
@@ -199,7 +199,7 @@ final class ClusteringColumnRestrictions extends 
RestrictionSetWrapper
 
     @Override
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options) throws 
InvalidRequestException
     {
         int position = 0;
@@ -207,9 +207,9 @@ final class ClusteringColumnRestrictions extends 
RestrictionSetWrapper
         for (SingleRestriction restriction : restrictions)
         {
             // We ignore all the clustering columns that can be handled by 
slices.
-            if (handleInFilter(restriction, position) || 
restriction.hasSupportingIndex(indexManager))
+            if (handleInFilter(restriction, position) || 
restriction.hasSupportingIndex(indexRegistry))
             {
-                restriction.addRowFilterTo(filter, indexManager, options);
+                restriction.addRowFilterTo(filter, indexRegistry, options);
                 continue;
             }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
index e1202a6..4c6ce2f 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/MultiColumnRestriction.java
@@ -28,7 +28,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -111,9 +111,9 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
     }
 
     @Override
-    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        for (Index index : indexManager.listIndexes())
+        for (Index index : indexRegistry.listIndexes())
            if (isSupportedBy(index))
                return true;
         return false;
@@ -186,7 +186,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
         }
 
         @Override
-        public final void addRowFilterTo(RowFilter filter, 
SecondaryIndexManager indexMananger, QueryOptions options)
+        public final void addRowFilterTo(RowFilter filter, IndexRegistry 
indexRegistry, QueryOptions options)
         {
             Tuples.Value t = ((Tuples.Value) value.bind(options));
             List<ByteBuffer> values = t.getElements();
@@ -244,7 +244,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
 
         @Override
         public final void addRowFilterTo(RowFilter filter,
-                                         SecondaryIndexManager indexManager,
+                                         IndexRegistry indexRegistry,
                                          QueryOptions options)
         {
             throw  invalidRequest("IN restrictions are not supported on 
indexed columns");
@@ -473,7 +473,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
 
         @Override
         public final void addRowFilterTo(RowFilter filter,
-                                         SecondaryIndexManager indexManager,
+                                         IndexRegistry indexRegistry,
                                          QueryOptions options)
         {
             throw invalidRequest("Multi-column slice restrictions cannot be 
used for filtering.");
@@ -561,7 +561,7 @@ public abstract class MultiColumnRestriction implements 
SingleRestriction
         }
 
         @Override
-        public final void addRowFilterTo(RowFilter filter, 
SecondaryIndexManager indexMananger, QueryOptions options)
+        public final void addRowFilterTo(RowFilter filter, IndexRegistry 
indexRegistry, QueryOptions options)
         {
             throw new UnsupportedOperationException("Secondary indexes do not 
support IS NOT NULL restrictions");
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
index ac589be..5bb3242 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/PartitionKeySingleRestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.db.ClusteringComparator;
 import org.apache.cassandra.db.ClusteringPrefix;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * A set of single restrictions on the partition key.
@@ -121,12 +121,12 @@ final class PartitionKeySingleRestrictionSet extends 
RestrictionSetWrapper imple
 
     @Override
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options)
     {
         for (SingleRestriction restriction : restrictions)
         {
-             restriction.addRowFilterTo(filter, indexManager, options);
+             restriction.addRowFilterTo(filter, indexRegistry, options);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
index daace46..91dedad 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/Restriction.java
@@ -23,7 +23,7 @@ import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * <p>Implementation of this class must be immutable.</p>
@@ -63,19 +63,19 @@ public interface Restriction
     /**
      * Check if the restriction is on indexed columns.
      *
-     * @param indexManager the index manager
+     * @param indexRegistry the index registry
      * @return <code>true</code> if the restriction is on indexed columns, 
<code>false</code>
      */
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager);
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry);
 
     /**
      * Adds to the specified row filter the expressions corresponding to this 
<code>Restriction</code>.
      *
      * @param filter the row filter to add expressions to
-     * @param indexManager the secondary index manager
+     * @param indexRegistry the index registry
      * @param options the query options
      */
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options);
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
index 7c805ce..427c396 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSet.java
@@ -27,7 +27,7 @@ import org.apache.cassandra.cql3.functions.Function;
 import 
org.apache.cassandra.cql3.restrictions.SingleColumnRestriction.ContainsRestriction;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -74,10 +74,10 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager 
indexManager, QueryOptions options) throws InvalidRequestException
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, 
QueryOptions options) throws InvalidRequestException
     {
         for (Restriction restriction : restrictions.values())
-            restriction.addRowFilterTo(filter, indexManager, options);
+            restriction.addRowFilterTo(filter, indexRegistry, options);
     }
 
     @Override
@@ -184,11 +184,11 @@ final class RestrictionSet implements Restrictions, 
Iterable<SingleRestriction>
     }
 
     @Override
-    public final boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public final boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
         for (Restriction restriction : restrictions.values())
         {
-            if (restriction.hasSupportingIndex(indexManager))
+            if (restriction.hasSupportingIndex(indexRegistry))
                 return true;
         }
         return false;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java 
b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
index c310908..9803adc 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/RestrictionSetWrapper.java
@@ -20,13 +20,14 @@ package org.apache.cassandra.cql3.restrictions;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.cql3.QueryOptions;
 import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.db.filter.RowFilter;
-import org.apache.cassandra.index.SecondaryIndexManager;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.cassandra.index.IndexRegistry;
 
 /**
  * A <code>RestrictionSet</code> wrapper that can be extended to allow to 
modify the <code>RestrictionSet</code>
@@ -45,10 +46,10 @@ class RestrictionSetWrapper implements Restrictions
     }
 
     public void addRowFilterTo(RowFilter filter,
-                               SecondaryIndexManager indexManager,
+                               IndexRegistry indexRegistry,
                                QueryOptions options)
     {
-        restrictions.addRowFilterTo(filter, indexManager, options);
+        restrictions.addRowFilterTo(filter, indexRegistry, options);
     }
 
     public List<ColumnMetadata> getColumnDefs()
@@ -71,9 +72,9 @@ class RestrictionSetWrapper implements Restrictions
         return restrictions.size();
     }
 
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        return restrictions.hasSupportingIndex(indexManager);
+        return restrictions.hasSupportingIndex(indexRegistry);
     }
 
     public ColumnMetadata getFirstColumn()

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
index 44f95a8..1b3482b 100644
--- 
a/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
+++ 
b/src/java/org/apache/cassandra/cql3/restrictions/SingleColumnRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.MultiCBuilder;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.Pair;
 
@@ -71,9 +71,9 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        for (Index index : indexManager.listIndexes())
+        for (Index index : indexRegistry.listIndexes())
             if (isSupportedBy(index))
                 return true;
 
@@ -151,7 +151,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             filter.add(columnDef, Operator.EQ, value.bindAndGet(options));
@@ -215,7 +215,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             throw invalidRequest("IN restrictions are not supported on indexed 
columns");
@@ -385,7 +385,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
         }
 
         @Override
-        public void addRowFilterTo(RowFilter filter, SecondaryIndexManager 
indexManager, QueryOptions options)
+        public void addRowFilterTo(RowFilter filter, IndexRegistry 
indexRegistry, QueryOptions options)
         {
             for (Bound b : Bound.values())
                 if (hasBound(b))
@@ -475,7 +475,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
         }
 
         @Override
-        public void addRowFilterTo(RowFilter filter, SecondaryIndexManager 
indexManager, QueryOptions options)
+        public void addRowFilterTo(RowFilter filter, IndexRegistry 
indexRegistry, QueryOptions options)
         {
             for (ByteBuffer value : bindAndGet(values, options))
                 filter.add(columnDef, Operator.CONTAINS, value);
@@ -615,7 +615,7 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             throw new UnsupportedOperationException("Secondary indexes do not 
support IS NOT NULL restrictions");
@@ -691,16 +691,16 @@ public abstract class SingleColumnRestriction implements 
SingleRestriction
 
         @Override
         public void addRowFilterTo(RowFilter filter,
-                                   SecondaryIndexManager indexManager,
+                                   IndexRegistry indexRegistry,
                                    QueryOptions options)
         {
             Pair<Operator, ByteBuffer> operation = 
makeSpecific(value.bindAndGet(options));
 
             // there must be a suitable INDEX for LIKE_XXX expressions
             RowFilter.SimpleExpression expression = filter.add(columnDef, 
operation.left, operation.right);
-            indexManager.getBestIndexFor(expression)
-                        .orElseThrow(() -> invalidRequest("%s is only 
supported on properly indexed columns",
-                                                          expression));
+            indexRegistry.getBestIndexFor(expression)
+                         .orElseThrow(() -> invalidRequest("%s is only 
supported on properly indexed columns",
+                                                           expression));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java 
b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
index 0240b6f..af1a964 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/StatementRestrictions.java
@@ -32,7 +32,7 @@ import org.apache.cassandra.db.marshal.AbstractType;
 import org.apache.cassandra.dht.*;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.index.Index;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.schema.ColumnMetadata;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.utils.btree.BTreeSet;
@@ -133,14 +133,9 @@ public final class StatementRestrictions
     {
         this(type, table, allowFiltering);
 
-        ColumnFamilyStore cfs;
-        SecondaryIndexManager secondaryIndexManager = null;
-
+        IndexRegistry indexRegistry = null;
         if (type.allowUseOfSecondaryIndices())
-        {
-            cfs = 
Keyspace.open(table.keyspace).getColumnFamilyStore(table.name);
-            secondaryIndexManager = cfs.indexManager;
-        }
+            indexRegistry = IndexRegistry.obtain(table);
 
         /*
          * WHERE clause. For a given entity, rules are:
@@ -165,7 +160,7 @@ public final class StatementRestrictions
             {
                 Restriction restriction = relation.toRestriction(table, 
boundNames);
 
-                if (!type.allowUseOfSecondaryIndices() || 
!restriction.hasSupportingIndex(secondaryIndexManager))
+                if (!type.allowUseOfSecondaryIndices() || 
!restriction.hasSupportingIndex(indexRegistry))
                     throw new InvalidRequestException(String.format("LIKE 
restriction is only supported on properly " +
                                                                     "indexed 
columns. %s is not valid.",
                                                                     
relation.toString()));
@@ -186,13 +181,13 @@ public final class StatementRestrictions
         if (type.allowUseOfSecondaryIndices())
         {
             if (whereClause.containsCustomExpressions())
-                processCustomIndexExpressions(whereClause.expressions, 
boundNames, secondaryIndexManager);
+                processCustomIndexExpressions(whereClause.expressions, 
boundNames, indexRegistry);
 
-            hasQueriableClusteringColumnIndex = 
clusteringColumnsRestrictions.hasSupportingIndex(secondaryIndexManager);
+            hasQueriableClusteringColumnIndex = 
clusteringColumnsRestrictions.hasSupportingIndex(indexRegistry);
             hasQueriableIndex = 
!filterRestrictions.getCustomIndexExpressions().isEmpty()
                     || hasQueriableClusteringColumnIndex
-                    || 
partitionKeyRestrictions.hasSupportingIndex(secondaryIndexManager)
-                    || 
nonPrimaryKeyRestrictions.hasSupportingIndex(secondaryIndexManager);
+                    || 
partitionKeyRestrictions.hasSupportingIndex(indexRegistry)
+                    || 
nonPrimaryKeyRestrictions.hasSupportingIndex(indexRegistry);
         }
 
         // At this point, the select statement if fully constructed, but we 
still have a few things to validate
@@ -564,7 +559,7 @@ public final class StatementRestrictions
 
     private void processCustomIndexExpressions(List<CustomIndexExpression> 
expressions,
                                                VariableSpecifications 
boundNames,
-                                               SecondaryIndexManager 
indexManager)
+                                               IndexRegistry indexRegistry)
     {
         if (expressions.size() > 1)
             throw new 
InvalidRequestException(IndexRestrictions.MULTIPLE_EXPRESSIONS);
@@ -582,7 +577,7 @@ public final class StatementRestrictions
         if (!table.indexes.has(expression.targetIndex.getIdx()))
             throw IndexRestrictions.indexNotFound(expression.targetIndex, 
table);
 
-        Index index = 
indexManager.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
+        Index index = 
indexRegistry.getIndex(table.indexes.get(expression.targetIndex.getIdx()).get());
 
         if (!index.getIndexMetadata().isCustom())
             throw 
IndexRestrictions.nonCustomIndexInExpression(expression.targetIndex);
@@ -596,14 +591,14 @@ public final class StatementRestrictions
         filterRestrictions.add(expression);
     }
 
-    public RowFilter getRowFilter(SecondaryIndexManager indexManager, 
QueryOptions options)
+    public RowFilter getRowFilter(IndexRegistry indexRegistry, QueryOptions 
options)
     {
         if (filterRestrictions.isEmpty())
             return RowFilter.NONE;
 
         RowFilter filter = RowFilter.create();
         for (Restrictions restrictions : filterRestrictions.getRestrictions())
-            restrictions.addRowFilterTo(filter, indexManager, options);
+            restrictions.addRowFilterTo(filter, indexRegistry, options);
 
         for (CustomIndexExpression expression : 
filterRestrictions.getCustomIndexExpressions())
             expression.addToRowFilter(filter, table, options);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
index a80b811..437b17c 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenFilter.java
@@ -34,7 +34,7 @@ import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.dht.IPartitioner;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 import static org.apache.cassandra.cql3.statements.Bound.END;
 import static org.apache.cassandra.cql3.statements.Bound.START;
@@ -272,15 +272,15 @@ final class TokenFilter implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager indexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
-        return restrictions.hasSupportingIndex(indexManager);
+        return restrictions.hasSupportingIndex(indexRegistry);
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager 
indexManager, QueryOptions options)
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, 
QueryOptions options)
     {
-        restrictions.addRowFilterTo(filter, indexManager, options);
+        restrictions.addRowFilterTo(filter, indexRegistry, options);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java 
b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
index 806974a..e71b177 100644
--- a/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
+++ b/src/java/org/apache/cassandra/cql3/restrictions/TokenRestriction.java
@@ -30,7 +30,7 @@ import org.apache.cassandra.cql3.functions.Function;
 import org.apache.cassandra.cql3.statements.Bound;
 import org.apache.cassandra.db.filter.RowFilter;
 import org.apache.cassandra.exceptions.InvalidRequestException;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 
 import static 
org.apache.cassandra.cql3.statements.RequestValidations.invalidRequest;
 
@@ -116,13 +116,13 @@ public abstract class TokenRestriction implements 
PartitionKeyRestrictions
     }
 
     @Override
-    public boolean hasSupportingIndex(SecondaryIndexManager 
secondaryIndexManager)
+    public boolean hasSupportingIndex(IndexRegistry indexRegistry)
     {
         return false;
     }
 
     @Override
-    public void addRowFilterTo(RowFilter filter, SecondaryIndexManager 
indexManager, QueryOptions options)
+    public void addRowFilterTo(RowFilter filter, IndexRegistry indexRegistry, 
QueryOptions options)
     {
         throw new UnsupportedOperationException("Index expression cannot be 
created for token restriction");
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
index e26910d..00d2b94 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterKeyspaceStatement.java
@@ -66,6 +66,8 @@ public class AlterKeyspaceStatement extends 
SchemaAlteringStatement
             throw new InvalidRequestException("Unknown keyspace " + name);
         if (SchemaConstants.isLocalSystemKeyspace(ksm.name))
             throw new InvalidRequestException("Cannot alter system keyspace");
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot alter virtual 
keyspaces");
 
         attrs.validate();
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
index b3aeb74..260c8fd 100644
--- a/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/AlterTableStatement.java
@@ -88,6 +88,9 @@ public class AlterTableStatement extends 
SchemaAlteringStatement
         if (current.isView())
             throw new InvalidRequestException("Cannot use ALTER TABLE on 
Materialized View");
 
+        if (current.isVirtual())
+            throw new InvalidRequestException("Cannot alter virtual tables");
+
         TableMetadata.Builder builder = current.unbuild();
 
         ColumnIdentifier columnName = null;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
index 2fcd867..a71c799 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchStatement.java
@@ -70,6 +70,8 @@ public class BatchStatement implements CQLStatement
     private final boolean updatesStaticRow;
     private final Attributes attrs;
     private final boolean hasConditions;
+    private final boolean updatesVirtualTables;
+
     private static final Logger logger = 
LoggerFactory.getLogger(BatchStatement.class);
 
     private static final String UNLOGGED_BATCH_WARNING = "Unlogged batch 
covering {} partitions detected " +
@@ -103,11 +105,13 @@ public class BatchStatement implements CQLStatement
         RegularAndStaticColumns.Builder conditionBuilder = 
RegularAndStaticColumns.builder();
         boolean updateRegular = false;
         boolean updateStatic = false;
+        boolean updatesVirtualTables = false;
 
         for (ModificationStatement stmt : statements)
         {
             regularBuilder.addAll(stmt.metadata(), stmt.updatedColumns());
             updateRegular |= stmt.updatesRegularRows();
+            updatesVirtualTables |= stmt.isVirtual();
             if (stmt.hasConditions())
             {
                 hasConditions = true;
@@ -121,6 +125,7 @@ public class BatchStatement implements CQLStatement
         this.updatesRegularRows = updateRegular;
         this.updatesStaticRow = updateStatic;
         this.hasConditions = hasConditions;
+        this.updatesVirtualTables = updatesVirtualTables;
     }
 
     public Iterable<org.apache.cassandra.cql3.functions.Function> 
getFunctions()
@@ -161,29 +166,46 @@ public class BatchStatement implements CQLStatement
         boolean hasCounters = false;
         boolean hasNonCounters = false;
 
+        boolean hasVirtualTables = false;
+        boolean hasRegularTables = false;
+
         for (ModificationStatement statement : statements)
         {
-            if (timestampSet && statement.isCounter())
-                throw new InvalidRequestException("Cannot provide custom 
timestamp for a BATCH containing counters");
-
             if (timestampSet && statement.isTimestampSet())
                 throw new InvalidRequestException("Timestamp must be set 
either on BATCH or individual statements");
 
-            if (isCounter() && !statement.isCounter())
-                throw new InvalidRequestException("Cannot include non-counter 
statement in a counter batch");
-
-            if (isLogged() && statement.isCounter())
-                throw new InvalidRequestException("Cannot include a counter 
statement in a logged batch");
-
             if (statement.isCounter())
                 hasCounters = true;
             else
                 hasNonCounters = true;
+
+            if (statement.isVirtual())
+                hasVirtualTables = true;
+            else
+                hasRegularTables = true;
         }
 
+        if (timestampSet && hasCounters)
+            throw new InvalidRequestException("Cannot provide custom timestamp 
for a BATCH containing counters");
+
+        if (isCounter() && hasNonCounters)
+            throw new InvalidRequestException("Cannot include non-counter 
statement in a counter batch");
+
         if (hasCounters && hasNonCounters)
             throw new InvalidRequestException("Counter and non-counter 
mutations cannot exist in the same batch");
 
+        if (isLogged() && hasCounters)
+            throw new InvalidRequestException("Cannot include a counter 
statement in a logged batch");
+
+        if (isLogged() && hasVirtualTables)
+            throw new InvalidRequestException("Cannot include a virtual table 
statement in a logged batch");
+
+        if (hasVirtualTables && hasRegularTables)
+            throw new InvalidRequestException("Mutations for virtual and 
regular tables cannot exist in the same batch");
+
+        if (hasConditions && hasVirtualTables)
+            throw new InvalidRequestException("Conditional BATCH statements 
cannot include mutations for virtual tables");
+
         if (hasConditions)
         {
             String ksName = null;
@@ -353,7 +375,11 @@ public class BatchStatement implements CQLStatement
         if (hasConditions)
             return executeWithConditions(options, queryState, 
queryStartNanoTime);
 
-        executeWithoutConditions(getMutations(options, local, now, 
queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+        if (updatesVirtualTables)
+            executeInternalWithoutCondition(queryState, options, 
queryStartNanoTime);
+        else    
+            executeWithoutConditions(getMutations(options, local, now, 
queryStartNanoTime), options.getConsistency(), queryStartNanoTime);
+
         return new ResultMessage.Void();
     }
 
@@ -482,16 +508,18 @@ public class BatchStatement implements CQLStatement
 
     public ResultMessage executeInternal(QueryState queryState, QueryOptions 
options) throws RequestValidationException, RequestExecutionException
     {
+        BatchQueryOptions batchOptions = 
BatchQueryOptions.withoutPerStatementVariables(options);
+
         if (hasConditions)
-            return 
executeInternalWithConditions(BatchQueryOptions.withoutPerStatementVariables(options),
 queryState);
+            return executeInternalWithConditions(batchOptions, queryState);
 
-        executeInternalWithoutCondition(queryState, options, 
System.nanoTime());
+        executeInternalWithoutCondition(queryState, batchOptions, 
System.nanoTime());
         return new ResultMessage.Void();
     }
 
-    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, QueryOptions options, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
+    private ResultMessage executeInternalWithoutCondition(QueryState 
queryState, BatchQueryOptions batchOptions, long queryStartNanoTime) throws 
RequestValidationException, RequestExecutionException
     {
-        for (IMutation mutation : 
getMutations(BatchQueryOptions.withoutPerStatementVariables(options), true, 
queryState.getTimestamp(), queryStartNanoTime))
+        for (IMutation mutation : getMutations(batchOptions, true, 
queryState.getTimestamp(), queryStartNanoTime))
             mutation.apply();
         return null;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java 
b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
index 9671b02..96d9f5a 100644
--- a/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
+++ b/src/java/org/apache/cassandra/cql3/statements/BatchUpdatesCollector.java
@@ -19,10 +19,10 @@ package org.apache.cassandra.cql3.statements;
 
 import java.nio.ByteBuffer;
 import java.util.*;
-import java.util.stream.Collectors;
 
 import com.google.common.collect.ImmutableMap;
 
+import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.schema.TableId;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.db.*;
@@ -84,15 +84,20 @@ final class BatchUpdatesCollector implements 
UpdatesCollector
 
     private IMutationBuilder getMutationBuilder(TableMetadata metadata, 
DecoratedKey dk, ConsistencyLevel consistency)
     {
-        String ksName = metadata.keyspace;
-        IMutationBuilder mutationBuilder = 
keyspaceMap(ksName).get(dk.getKey());
-        if (mutationBuilder == null)
+        return keyspaceMap(metadata.keyspace).computeIfAbsent(dk.getKey(), k 
-> makeMutationBuilder(metadata, dk, consistency));
+    }
+
+    private IMutationBuilder makeMutationBuilder(TableMetadata metadata, 
DecoratedKey partitionKey, ConsistencyLevel cl)
+    {
+        if (metadata.isVirtual())
         {
-            MutationBuilder builder = new MutationBuilder(ksName, dk);
-            mutationBuilder = metadata.isCounter() ? new 
CounterMutationBuilder(builder, consistency) : builder;
-            keyspaceMap(ksName).put(dk.getKey(), mutationBuilder);
+            return new VirtualMutationBuilder(metadata.keyspace, partitionKey);
+        }
+        else
+        {
+            MutationBuilder builder = new MutationBuilder(metadata.keyspace, 
partitionKey);
+            return metadata.isCounter() ? new CounterMutationBuilder(builder, 
cl) : builder;
         }
-        return mutationBuilder;
     }
 
     /**
@@ -228,4 +233,41 @@ final class BatchUpdatesCollector implements 
UpdatesCollector
             return mutationBuilder.get(id);
         }
     }
+
+    private static class VirtualMutationBuilder implements IMutationBuilder
+    {
+        private final String keyspaceName;
+        private final DecoratedKey partitionKey;
+
+        private final HashMap<TableId, PartitionUpdate.Builder> modifications 
= new HashMap<>();
+
+        private VirtualMutationBuilder(String keyspaceName, DecoratedKey 
partitionKey)
+        {
+            this.keyspaceName = keyspaceName;
+            this.partitionKey = partitionKey;
+        }
+
+        @Override
+        public VirtualMutationBuilder add(PartitionUpdate.Builder builder)
+        {
+            PartitionUpdate.Builder prev = 
modifications.put(builder.metadata().id, builder);
+            if (null != prev)
+                throw new IllegalStateException();
+            return this;
+        }
+
+        @Override
+        public VirtualMutation build()
+        {
+            ImmutableMap.Builder<TableId, PartitionUpdate> updates = new 
ImmutableMap.Builder<>();
+            modifications.forEach((tableId, updateBuilder) -> 
updates.put(tableId, updateBuilder.build()));
+            return new VirtualMutation(keyspaceName, partitionKey, 
updates.build());
+        }
+
+        @Override
+        public PartitionUpdate.Builder get(TableId tableId)
+        {
+            return modifications.get(tableId);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java 
b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
index eb6aa70..7953c8b 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CQL3CasRequest.java
@@ -22,6 +22,7 @@ import java.util.*;
 
 import com.google.common.collect.*;
 
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.conditions.ColumnCondition;
@@ -183,7 +184,7 @@ public class CQL3CasRequest implements CASRequest
         return conditionColumns;
     }
 
-    public SinglePartitionReadCommand readCommand(int nowInSec)
+    public SinglePartitionReadQuery readCommand(int nowInSec)
     {
         assert staticConditions != null || !conditions.isEmpty();
 
@@ -193,16 +194,16 @@ public class CQL3CasRequest implements CASRequest
         // With only a static condition, we still want to make the distinction 
between a non-existing partition and one
         // that exists (has some live data) but has not static content. So we 
query the first live row of the partition.
         if (conditions.isEmpty())
-            return SinglePartitionReadCommand.create(metadata,
-                                                     nowInSec,
-                                                     columnFilter,
-                                                     RowFilter.NONE,
-                                                     DataLimits.cqlLimits(1),
-                                                     key,
-                                                     new 
ClusteringIndexSliceFilter(Slices.ALL, false));
+            return SinglePartitionReadQuery.create(metadata,
+                                                   nowInSec,
+                                                   columnFilter,
+                                                   RowFilter.NONE,
+                                                   DataLimits.cqlLimits(1),
+                                                   key,
+                                                   new 
ClusteringIndexSliceFilter(Slices.ALL, false));
 
         ClusteringIndexNamesFilter filter = new 
ClusteringIndexNamesFilter(conditions.navigableKeySet(), false);
-        return SinglePartitionReadCommand.create(metadata, nowInSec, key, 
columnFilter, filter);
+        return SinglePartitionReadQuery.create(metadata, nowInSec, key, 
columnFilter, filter);
     }
 
     /**
@@ -244,7 +245,7 @@ public class CQL3CasRequest implements CASRequest
             upd.applyUpdates(current, updateBuilder);
 
         PartitionUpdate partitionUpdate = updateBuilder.build();
-        
Keyspace.openAndGetStore(metadata).indexManager.validate(partitionUpdate);
+        IndexRegistry.obtain(metadata).validate(partitionUpdate);
 
         return partitionUpdate;
     }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
index d3ef599..e428087 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/CreateAggregateStatement.java
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.*;
 import org.apache.cassandra.cql3.functions.*;
@@ -206,8 +207,12 @@ public final class CreateAggregateStatement extends 
SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' 
and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+        if (null == ksm)
             throw new InvalidRequestException(String.format("Cannot add 
aggregate '%s' to non existing keyspace '%s'.", functionName.name, 
functionName.keyspace));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create aggregates in 
virtual keyspaces");
     }
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean 
isLocalOnly) throws RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
index 1f0e703..c380991 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateFunctionStatement.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.*;
 import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.cql3.CQL3Type;
 import org.apache.cassandra.cql3.ColumnIdentifier;
@@ -136,8 +137,11 @@ public final class CreateFunctionStatement extends 
SchemaAlteringStatement
         if (ifNotExists && orReplace)
             throw new InvalidRequestException("Cannot use both 'OR REPLACE' 
and 'IF NOT EXISTS' directives");
 
-        if (Schema.instance.getKeyspaceMetadata(functionName.keyspace) == null)
+        KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(functionName.keyspace);
+        if (null == ksm)
             throw new InvalidRequestException(String.format("Cannot add 
function '%s' to non existing keyspace '%s'.", functionName.name, 
functionName.keyspace));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create functions in 
virtual keyspaces");
     }
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean 
isLocalOnly) throws RequestValidationException

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
index 9d0a714..778c4a3 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateIndexStatement.java
@@ -82,6 +82,9 @@ public class CreateIndexStatement extends 
SchemaAlteringStatement
     {
         TableMetadata table = Schema.instance.validateTable(keyspace(), 
columnFamily());
 
+        if (table.isVirtual())
+            throw new InvalidRequestException("Secondary indexes are not 
supported on virtual tables");
+
         if (table.isCounter())
             throw new InvalidRequestException("Secondary indexes are not 
supported on counter tables");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
index 55249c4..7c639e2 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTableStatement.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.cql3.statements;
 import java.nio.ByteBuffer;
 import java.util.*;
 import java.util.regex.Pattern;
+
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
 import org.apache.commons.lang3.StringUtils;
@@ -134,7 +135,6 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
                .isCompound(isCompound)
                .isCounter(hasCounters)
                .isSuper(false)
-               .isView(false)
                .params(params);
 
         for (int i = 0; i < keyAliases.size(); i++)
@@ -213,6 +213,9 @@ public class CreateTableStatement extends 
SchemaAlteringStatement
             KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(keyspace());
             if (ksm == null)
                 throw new ConfigurationException(String.format("Keyspace %s 
doesn't exist", keyspace()));
+            if (ksm.isVirtual())
+                throw new InvalidRequestException("Cannot create tables in 
virtual keyspaces");
+
             return prepare(ksm.types);
         }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
index d57bff7..f2cd217 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTriggerStatement.java
@@ -62,6 +62,8 @@ public class CreateTriggerStatement extends 
SchemaAlteringStatement
     public void validate(ClientState state) throws RequestValidationException
     {
         TableMetadata metadata = Schema.instance.validateTable(keyspace(), 
columnFamily());
+        if (metadata.isVirtual())
+            throw new InvalidRequestException("Cannot CREATE TRIGGER against a 
virtual table");
         if (metadata.isView())
             throw new InvalidRequestException("Cannot CREATE TRIGGER against a 
materialized view");
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
index 7dce478..1a0da4c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateTypeStatement.java
@@ -73,6 +73,8 @@ public class CreateTypeStatement extends 
SchemaAlteringStatement
         KeyspaceMetadata ksm = 
Schema.instance.getKeyspaceMetadata(name.getKeyspace());
         if (ksm == null)
             throw new InvalidRequestException(String.format("Cannot add type 
in unknown keyspace %s", name.getKeyspace()));
+        if (ksm.isVirtual())
+            throw new InvalidRequestException("Cannot create types in virtual 
keyspaces");
 
         if (ksm.types.get(name.getUserTypeName()).isPresent() && !ifNotExists)
             throw new InvalidRequestException(String.format("A user type of 
name %s already exists", name));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
index 01ed6fe..b50a552 100644
--- a/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/CreateViewStatement.java
@@ -151,6 +151,8 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
 
         TableMetadata metadata = 
Schema.instance.validateTable(baseName.getKeyspace(), 
baseName.getColumnFamily());
 
+        if (metadata.isVirtual())
+            throw new InvalidRequestException("Materialized views are not 
supported on virtual tables");
         if (metadata.isCounter())
             throw new InvalidRequestException("Materialized views are not 
supported on counter tables");
         if (metadata.isView())
@@ -317,7 +319,7 @@ public class CreateViewStatement extends 
SchemaAlteringStatement
 
         TableMetadata.Builder builder =
             TableMetadata.builder(keyspace(), columnFamily(), 
properties.properties.getId())
-                         .isView(true)
+                         .kind(TableMetadata.Kind.VIEW)
                          .params(params);
 
         add(metadata, targetPartitionKeys, builder::addPartitionKeyColumn);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
index af04572..639286c 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DeleteStatement.java
@@ -144,6 +144,8 @@ public class DeleteStatement extends ModificationStatement
                                                         Conditions conditions,
                                                         Attributes attrs)
         {
+            checkFalse(metadata.isVirtual(), "Virtual tables don't support 
DELETE statements");
+
             Operations operations = new Operations(type);
 
             for (Operation.RawDeletion deletion : deletions)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
index cfc6564..2f302c0 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropKeyspaceStatement.java
@@ -19,6 +19,7 @@ package org.apache.cassandra.cql3.statements;
 
 import org.apache.cassandra.audit.AuditLogEntryType;
 import org.apache.cassandra.auth.Permission;
+import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry;
 import org.apache.cassandra.exceptions.ConfigurationException;
 import org.apache.cassandra.exceptions.InvalidRequestException;
 import org.apache.cassandra.exceptions.RequestValidationException;
@@ -61,6 +62,9 @@ public class DropKeyspaceStatement extends 
SchemaAlteringStatement
 
     public Event.SchemaChange announceMigration(QueryState queryState, boolean 
isLocalOnly) throws ConfigurationException
     {
+        if (null != 
VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspace))
+            throw new InvalidRequestException("Cannot drop virtual keyspaces");
+
         try
         {
             MigrationManager.announceKeyspaceDrop(keyspace, isLocalOnly);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
index d7801e5..beb1002 100644
--- a/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/DropTableStatement.java
@@ -75,6 +75,9 @@ public class DropTableStatement extends 
SchemaAlteringStatement
                 if (metadata.isView())
                     throw new InvalidRequestException("Cannot use DROP TABLE 
on Materialized View");
 
+                if (metadata.isVirtual())
+                    throw new InvalidRequestException("Cannot drop virtual 
tables");
+
                 boolean rejectDrop = false;
                 StringBuilder messageBuilder = new StringBuilder();
                 for (ViewMetadata def : ksm.views)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
index 31aa80c..e02fd41 100644
--- a/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -203,6 +203,11 @@ public abstract class ModificationStatement implements 
CQLStatement
         return metadata().isView();
     }
 
+    public boolean isVirtual()
+    {
+        return metadata().isVirtual();
+    }
+
     public long getTimestamp(long now, QueryOptions options) throws 
InvalidRequestException
     {
         return attrs.getTimestamp(now, options);
@@ -248,6 +253,8 @@ public abstract class ModificationStatement implements 
CQLStatement
         checkFalse(isCounter() && attrs.isTimestampSet(), "Cannot provide 
custom timestamp for counter updates");
         checkFalse(isCounter() && attrs.isTimeToLiveSet(), "Cannot provide 
custom TTL for counter updates");
         checkFalse(isView(), "Cannot directly modify a materialized view");
+        checkFalse(isVirtual() && attrs.isTimeToLiveSet(), "Expiring columns 
are not supported by virtual tables");
+        checkFalse(isVirtual() && hasConditions(), "Conditional updates are 
not supported by virtual tables");
     }
 
     public RegularAndStaticColumns updatedColumns()
@@ -440,6 +447,9 @@ public abstract class ModificationStatement implements 
CQLStatement
     private ResultMessage executeWithoutCondition(QueryState queryState, 
QueryOptions options, long queryStartNanoTime)
     throws RequestExecutionException, RequestValidationException
     {
+        if (isVirtual())
+            return executeInternalWithoutCondition(queryState, options, 
queryStartNanoTime);
+
         ConsistencyLevel cl = options.getConsistency();
         if (isCounter())
             cl.validateCounterForWrite(metadata());
@@ -613,7 +623,7 @@ public abstract class ModificationStatement implements 
CQLStatement
     {
         UUID ballot = UUIDGen.getTimeUUIDFromMicros(state.getTimestamp());
 
-        SinglePartitionReadCommand readCommand = 
request.readCommand(FBUtilities.nowInSeconds());
+        SinglePartitionReadQuery readCommand = 
request.readCommand(FBUtilities.nowInSeconds());
         FilteredPartition current;
         try (ReadExecutionController executionController = 
readCommand.executionController();
              PartitionIterator iter = 
readCommand.executeInternal(executionController))

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 7fa9964..ef3db51 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -55,7 +55,7 @@ import org.apache.cassandra.db.rows.RowIterator;
 import org.apache.cassandra.db.view.View;
 import org.apache.cassandra.dht.AbstractBounds;
 import org.apache.cassandra.exceptions.*;
-import org.apache.cassandra.index.SecondaryIndexManager;
+import org.apache.cassandra.index.IndexRegistry;
 import org.apache.cassandra.serializers.MarshalException;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.ClientWarn;
@@ -458,7 +458,7 @@ public class SelectStatement implements CQLStatement
     {
         QueryPager pager = query.getPager(options.getPagingState(), 
options.getProtocolVersion());
 
-        if (aggregationSpec == null || query == ReadQuery.EMPTY)
+        if (aggregationSpec == null || query.isEmpty())
             return pager;
 
         return new AggregationQueryPager(pager, query.limits());
@@ -501,25 +501,22 @@ public class SelectStatement implements CQLStatement
     {
         Collection<ByteBuffer> keys = restrictions.getPartitionKeys(options);
         if (keys.isEmpty())
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         ClusteringIndexFilter filter = makeClusteringIndexFilter(options, 
columnFilter);
         if (filter == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         RowFilter rowFilter = getRowFilter(options);
 
-        // Note that we use the total limit for every key, which is 
potentially inefficient.
-        // However, IN + LIMIT is not a very sensible choice.
-        List<SinglePartitionReadCommand> commands = new 
ArrayList<>(keys.size());
+        List<DecoratedKey> decoratedKeys = new ArrayList<>(keys.size());
         for (ByteBuffer key : keys)
         {
             QueryProcessor.validateKey(key);
-            DecoratedKey dk = 
table.partitioner.decorateKey(ByteBufferUtil.clone(key));
-            commands.add(SinglePartitionReadCommand.create(table, nowInSec, 
columnFilter, rowFilter, limit, dk, filter));
+            
decoratedKeys.add(table.partitioner.decorateKey(ByteBufferUtil.clone(key)));
         }
 
-        return new SinglePartitionReadCommand.Group(commands, limit);
+        return SinglePartitionReadQuery.createGroup(table, nowInSec, 
columnFilter, rowFilter, limit, decoratedKeys, filter);
     }
 
     /**
@@ -569,7 +566,7 @@ public class SelectStatement implements CQLStatement
     {
         ClusteringIndexFilter clusteringIndexFilter = 
makeClusteringIndexFilter(options, columnFilter);
         if (clusteringIndexFilter == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
         RowFilter rowFilter = getRowFilter(options);
 
@@ -577,10 +574,10 @@ public class SelectStatement implements CQLStatement
         // We want to have getRangeSlice to count the number of columns, not 
the number of keys.
         AbstractBounds<PartitionPosition> keyBounds = 
restrictions.getPartitionKeyBounds(options);
         if (keyBounds == null)
-            return ReadQuery.EMPTY;
+            return ReadQuery.empty(table);
 
-        PartitionRangeReadCommand command =
-            PartitionRangeReadCommand.create(table, nowInSec, columnFilter, 
rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
+        ReadQuery command =
+            PartitionRangeReadQuery.create(table, nowInSec, columnFilter, 
rowFilter, limit, new DataRange(keyBounds, clusteringIndexFilter));
 
         // If there's a secondary index that the command can use, have it 
validate the request parameters.
         command.maybeValidateIndex();
@@ -755,10 +752,8 @@ public class SelectStatement implements CQLStatement
      */
     public RowFilter getRowFilter(QueryOptions options) throws 
InvalidRequestException
     {
-        ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
-        SecondaryIndexManager secondaryIndexManager = cfs.indexManager;
-        RowFilter filter = restrictions.getRowFilter(secondaryIndexManager, 
options);
-        return filter;
+        IndexRegistry indexRegistry = IndexRegistry.obtain(table);
+        return restrictions.getRowFilter(indexRegistry, options);
     }
 
     private ResultSet process(PartitionIterator partitions,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
index 9eaf897..1def3fd 100644
--- 
a/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
+++ 
b/src/java/org/apache/cassandra/cql3/statements/SingleTableUpdatesCollector.java
@@ -31,6 +31,7 @@ import org.apache.cassandra.db.IMutation;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.RegularAndStaticColumns;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
+import org.apache.cassandra.db.virtual.VirtualMutation;
 import org.apache.cassandra.schema.TableMetadata;
 
 /**
@@ -86,12 +87,15 @@ final class SingleTableUpdatesCollector implements 
UpdatesCollector
         List<IMutation> ms = new ArrayList<>();
         for (PartitionUpdate.Builder builder : puBuilders.values())
         {
-            IMutation mutation = null;
+            IMutation mutation;
 
-            if (metadata.isCounter())
+            if (metadata.isVirtual())
+                mutation = new VirtualMutation(builder.build());
+            else if (metadata.isCounter())
                 mutation = new CounterMutation(new Mutation(builder.build()), 
counterConsistencyLevel);
             else
                 mutation = new Mutation(builder.build());
+
             mutation.validateIndexedColumns();
             ms.add(mutation);
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java 
b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
index 5d09cfa..d41a814 100644
--- a/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/TruncateStatement.java
@@ -69,6 +69,9 @@ public class TruncateStatement extends CFStatement implements 
CQLStatement
             if (metaData.isView())
                 throw new InvalidRequestException("Cannot TRUNCATE 
materialized view directly; must truncate base table instead");
 
+            if (metaData.isVirtual())
+                throw new InvalidRequestException("Cannot truncate virtual 
tables");
+
             StorageProxy.truncateBlocking(keyspace(), columnFamily());
         }
         catch (UnavailableException | TimeoutException e)
@@ -82,6 +85,13 @@ public class TruncateStatement extends CFStatement 
implements CQLStatement
     {
         try
         {
+            TableMetadata metaData = 
Schema.instance.getTableMetadata(keyspace(), columnFamily());
+            if (metaData.isView())
+                throw new InvalidRequestException("Cannot TRUNCATE 
materialized view directly; must truncate base table instead");
+
+            if (metaData.isVirtual())
+                throw new InvalidRequestException("Cannot truncate virtual 
tables");
+
             ColumnFamilyStore cfs = 
Keyspace.open(keyspace()).getColumnFamilyStore(columnFamily());
             cfs.truncateBlocking();
         }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/AbstractReadQuery.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/AbstractReadQuery.java 
b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
new file mode 100644
index 0000000..c6ec329
--- /dev/null
+++ b/src/java/org/apache/cassandra/db/AbstractReadQuery.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.cassandra.db;
+
+import org.apache.cassandra.db.filter.ColumnFilter;
+import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.filter.RowFilter;
+import org.apache.cassandra.db.monitoring.MonitorableImpl;
+import org.apache.cassandra.db.partitions.PartitionIterator;
+import org.apache.cassandra.db.partitions.UnfilteredPartitionIterators;
+import org.apache.cassandra.schema.TableMetadata;
+
+/**
+ * Base class for {@code ReadQuery} implementations.
+ */
+abstract class AbstractReadQuery extends MonitorableImpl implements ReadQuery
+{
+    private final TableMetadata metadata;
+    private final int nowInSec;
+
+    private final ColumnFilter columnFilter;
+    private final RowFilter rowFilter;
+    private final DataLimits limits;
+
+    protected AbstractReadQuery(TableMetadata metadata, int nowInSec, 
ColumnFilter columnFilter, RowFilter rowFilter, DataLimits limits)
+    {
+        this.metadata = metadata;
+        this.nowInSec = nowInSec;
+        this.columnFilter = columnFilter;
+        this.rowFilter = rowFilter;
+        this.limits = limits;
+    }
+
+    @Override
+    public TableMetadata metadata()
+    {
+        return metadata;
+    }
+
+    // Monitorable interface
+    public String name()
+    {
+        return toCQLString();
+    }
+
+    @Override
+    public PartitionIterator executeInternal(ReadExecutionController 
controller)
+    {
+        return UnfilteredPartitionIterators.filter(executeLocally(controller), 
nowInSec());
+    }
+
+    @Override
+    public DataLimits limits()
+    {
+        return limits;
+    }
+
+    @Override
+    public int nowInSec()
+    {
+        return nowInSec;
+    }
+
+    @Override
+    public RowFilter rowFilter()
+    {
+        return rowFilter;
+    }
+
+    @Override
+    public ColumnFilter columnFilter()
+    {
+        return columnFilter;
+    }
+
+    /**
+     * Recreate the CQL string corresponding to this query.
+     * <p>
+     * Note that in general the returned string will not be exactly the 
original user string, first
+     * because there isn't always a single syntax for a given query,  but also 
because we don't have
+     * all the information needed (we know the non-PK columns queried but not 
the PK ones as internally
+     * we query them all). So this shouldn't be relied too strongly, but this 
should be good enough for
+     * debugging purpose which is what this is for.
+     */
+    public String toCQLString()
+    {
+        StringBuilder sb = new StringBuilder().append("SELECT ")
+                                              .append(columnFilter())
+                                              .append(" FROM ")
+                                              .append(metadata().keyspace)
+                                              .append('.')
+                                              .append(metadata().name);
+        appendCQLWhereClause(sb);
+
+        if (limits() != DataLimits.NONE)
+            sb.append(' ').append(limits());
+        return sb.toString();
+    }
+
+    protected abstract void appendCQLWhereClause(StringBuilder sb);
+}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/Keyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/Keyspace.java 
b/src/java/org/apache/cassandra/db/Keyspace.java
index 651d156..50720f4 100644
--- a/src/java/org/apache/cassandra/db/Keyspace.java
+++ b/src/java/org/apache/cassandra/db/Keyspace.java
@@ -327,6 +327,8 @@ public class Keyspace
     {
         metadata = Schema.instance.getKeyspaceMetadata(keyspaceName);
         assert metadata != null : "Unknown keyspace " + keyspaceName;
+        if (metadata.isVirtual())
+            throw new IllegalStateException("Cannot initialize Keyspace with 
virtual metadata " + keyspaceName);
         createReplicationStrategy(metadata);
 
         this.metric = new KeyspaceMetrics(this);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java 
b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
index 27d7d4e..a6641d4 100644
--- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
+++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
@@ -45,15 +45,13 @@ import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.schema.IndexMetadata;
 import org.apache.cassandra.service.ClientState;
 import org.apache.cassandra.service.StorageProxy;
-import org.apache.cassandra.service.pager.*;
 import org.apache.cassandra.tracing.Tracing;
-import org.apache.cassandra.transport.ProtocolVersion;
 import org.apache.cassandra.utils.FBUtilities;
 
 /**
  * A read command that selects a (part of a) range of partitions.
  */
-public class PartitionRangeReadCommand extends ReadCommand
+public class PartitionRangeReadCommand extends ReadCommand implements 
PartitionRangeReadQuery
 {
     protected static final SelectionDeserializer selectionDeserializer = new 
Deserializer();
 
@@ -187,7 +185,8 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              indexMetadata());
     }
 
-    public ReadCommand withUpdatedLimit(DataLimits newLimits)
+    @Override
+    public PartitionRangeReadCommand withUpdatedLimit(DataLimits newLimits)
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
                                              digestVersion(),
@@ -200,6 +199,7 @@ public class PartitionRangeReadCommand extends ReadCommand
                                              indexMetadata());
     }
 
+    @Override
     public PartitionRangeReadCommand withUpdatedLimitsAndDataRange(DataLimits 
newLimits, DataRange newDataRange)
     {
         return new PartitionRangeReadCommand(isDigestQuery(),
@@ -218,34 +218,11 @@ public class PartitionRangeReadCommand extends ReadCommand
         return DatabaseDescriptor.getRangeRpcTimeout();
     }
 
-    public boolean selectsKey(DecoratedKey key)
-    {
-        if (!dataRange().contains(key))
-            return false;
-
-        return rowFilter().partitionKeyRestrictionsAreSatisfiedBy(key, 
metadata().partitionKeyType);
-    }
-
-    public boolean selectsClustering(DecoratedKey key, Clustering clustering)
-    {
-        if (clustering == Clustering.STATIC_CLUSTERING)
-            return !columnFilter().fetchedColumns().statics.isEmpty();
-
-        if (!dataRange().clusteringIndexFilter(key).selects(clustering))
-            return false;
-        return rowFilter().clusteringKeyRestrictionsAreSatisfiedBy(clustering);
-    }
-
     public PartitionIterator execute(ConsistencyLevel consistency, ClientState 
clientState, long queryStartNanoTime) throws RequestExecutionException
     {
         return StorageProxy.getRangeSlice(this, consistency, 
queryStartNanoTime);
     }
 
-    public QueryPager getPager(PagingState pagingState, ProtocolVersion 
protocolVersion)
-    {
-            return new PartitionRangeQueryPager(this, pagingState, 
protocolVersion);
-    }
-
     protected void recordLatency(TableMetrics metric, long latencyNanos)
     {
         metric.rangeLatency.addNano(latencyNanos);
@@ -389,13 +366,6 @@ public class PartitionRangeReadCommand extends ReadCommand
     }
 
     @Override
-    public boolean selectsFullPartition()
-    {
-        return metadata().isStaticCompactTable() ||
-               (dataRange.selectsAllPartition() && 
!rowFilter().hasExpressionOnClusteringOrRegularColumns());
-    }
-
-    @Override
     public String toString()
     {
         return String.format("Read(%s columns=%s rowfilter=%s limits=%s %s)",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to