http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java new file mode 100644 index 0000000..6750215 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspace.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.util.Collection; + +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; + +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Tables; + +public class VirtualKeyspace +{ + private final String name; + private final KeyspaceMetadata metadata; + + private final ImmutableCollection<VirtualTable> tables; + + public VirtualKeyspace(String name, Collection<VirtualTable> tables) + { + this.name = name; + this.tables = ImmutableList.copyOf(tables); + + metadata = KeyspaceMetadata.virtual(name, Tables.of(Iterables.transform(tables, VirtualTable::metadata))); + } + + public String name() + { + return name; + } + + public KeyspaceMetadata metadata() + { + return metadata; + } + + public ImmutableCollection<VirtualTable> tables() + { + return tables; + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java new file mode 100644 index 0000000..5e0f90c --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/VirtualKeyspaceRegistry.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import javax.annotation.Nullable; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.schema.TableMetadata; + +public final class VirtualKeyspaceRegistry +{ + public static final VirtualKeyspaceRegistry instance = new VirtualKeyspaceRegistry(); + + private final Map<String, VirtualKeyspace> virtualKeyspaces = new ConcurrentHashMap<>(); + private final Map<TableId, VirtualTable> virtualTables = new ConcurrentHashMap<>(); + + private VirtualKeyspaceRegistry() + { + } + + public void register(VirtualKeyspace keyspace) + { + virtualKeyspaces.put(keyspace.name(), keyspace); + keyspace.tables().forEach(t -> virtualTables.put(t.metadata().id, t)); + } + + @Nullable + public VirtualKeyspace getKeyspaceNullable(String name) + { + return virtualKeyspaces.get(name); + } + + @Nullable + public VirtualTable getTableNullable(TableId id) + { + return virtualTables.get(id); + } + + @Nullable + public KeyspaceMetadata getKeyspaceMetadataNullable(String name) + { + VirtualKeyspace keyspace = virtualKeyspaces.get(name); + return null != keyspace ? keyspace.metadata() : null; + } + + @Nullable + public TableMetadata getTableMetadataNullable(TableId id) + { + VirtualTable table = virtualTables.get(id); + return null != table ? table.metadata() : null; + } + + public Iterable<KeyspaceMetadata> virtualKeyspacesMetadata() + { + return Iterables.transform(virtualKeyspaces.values(), VirtualKeyspace::metadata); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java new file mode 100644 index 0000000..dc32c8c --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/VirtualMutation.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import java.util.Collection; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.IMutation; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.schema.TableId; + +/** + * A specialised IMutation implementation for virtual keyspaces. + * + * Mainly overrides {@link #apply()} to go straight to {@link VirtualTable#apply(PartitionUpdate)} for every table involved. + */ +public final class VirtualMutation implements IMutation +{ + private final String keyspaceName; + private final DecoratedKey partitionKey; + private final ImmutableMap<TableId, PartitionUpdate> modifications; + + public VirtualMutation(PartitionUpdate update) + { + this(update.metadata().keyspace, update.partitionKey(), ImmutableMap.of(update.metadata().id, update)); + } + + public VirtualMutation(String keyspaceName, DecoratedKey partitionKey, ImmutableMap<TableId, PartitionUpdate> modifications) + { + this.keyspaceName = keyspaceName; + this.partitionKey = partitionKey; + this.modifications = modifications; + } + + @Override + public void apply() + { + modifications.forEach((id, update) -> VirtualKeyspaceRegistry.instance.getTableNullable(id).apply(update)); + } + + @Override + public String getKeyspaceName() + { + return keyspaceName; + } + + @Override + public Collection<TableId> getTableIds() + { + return modifications.keySet(); + } + + @Override + public DecoratedKey key() + { + return partitionKey; + } + + @Override + public long getTimeout() + { + return DatabaseDescriptor.getWriteRpcTimeout(); + } + + @Override + public String toString(boolean shallow) + { + MoreObjects.ToStringHelper helper = + MoreObjects.toStringHelper(this) + .add("keyspace", keyspaceName) + .add("partition key", partitionKey); + + if (shallow) + helper.add("tables", getTableIds()); + else + helper.add("modifications", getPartitionUpdates()); + + return helper.toString(); + } + + @Override + public Collection<PartitionUpdate> getPartitionUpdates() + { + return modifications.values(); + } + + @Override + public void validateIndexedColumns() + { + // no-op + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java new file mode 100644 index 0000000..299cc00 --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/VirtualSchemaKeyspace.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import com.google.common.collect.ImmutableList; + +import org.apache.cassandra.db.marshal.BytesType; +import org.apache.cassandra.db.marshal.Int32Type; +import org.apache.cassandra.db.marshal.UTF8Type; +import org.apache.cassandra.schema.ColumnMetadata; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableMetadata; + +import static org.apache.cassandra.schema.TableMetadata.builder; + +public final class VirtualSchemaKeyspace extends VirtualKeyspace +{ + private static final String NAME = "system_virtual_schema"; + + public static final VirtualSchemaKeyspace instance = new VirtualSchemaKeyspace(); + + private VirtualSchemaKeyspace() + { + super(NAME, ImmutableList.of(new VirtualKeyspaces(NAME), new VirtualTables(NAME), new VirtualColumns(NAME))); + } + + private static final class VirtualKeyspaces extends AbstractVirtualTable + { + private static final String KEYSPACE_NAME = "keyspace_name"; + + private VirtualKeyspaces(String keyspace) + { + super(builder(keyspace, "keyspaces") + .comment("virtual keyspace definitions") + .kind(TableMetadata.Kind.VIRTUAL) + .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance) + .build()); + } + + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata()) + result.row(keyspace.name); + return result; + } + } + + private static final class VirtualTables extends AbstractVirtualTable + { + private static final String KEYSPACE_NAME = "keyspace_name"; + private static final String TABLE_NAME = "table_name"; + private static final String COMMENT = "comment"; + + private VirtualTables(String keyspace) + { + super(builder(keyspace, "tables") + .comment("virtual table definitions") + .kind(TableMetadata.Kind.VIRTUAL) + .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance) + .addClusteringColumn(TABLE_NAME, UTF8Type.instance) + .addRegularColumn(COMMENT, UTF8Type.instance) + .build()); + } + + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata()) + { + for (TableMetadata table : keyspace.tables) + { + result.row(table.keyspace, table.name) + .column(COMMENT, table.params.comment); + } + } + + return result; + } + } + + private static final class VirtualColumns extends AbstractVirtualTable + { + private static final String KEYSPACE_NAME = "keyspace_name"; + private static final String TABLE_NAME = "table_name"; + private static final String COLUMN_NAME = "column_name"; + private static final String CLUSTERING_ORDER = "clustering_order"; + private static final String COLUMN_NAME_BYTES = "column_name_bytes"; + private static final String KIND = "kind"; + private static final String POSITION = "position"; + private static final String TYPE = "type"; + + private VirtualColumns(String keyspace) + { + super(builder(keyspace, "columns") + .comment("virtual column definitions") + .kind(TableMetadata.Kind.VIRTUAL) + .addPartitionKeyColumn(KEYSPACE_NAME, UTF8Type.instance) + .addClusteringColumn(TABLE_NAME, UTF8Type.instance) + .addClusteringColumn(COLUMN_NAME, UTF8Type.instance) + .addRegularColumn(CLUSTERING_ORDER, UTF8Type.instance) + .addRegularColumn(COLUMN_NAME_BYTES, BytesType.instance) + .addRegularColumn(KIND, UTF8Type.instance) + .addRegularColumn(POSITION, Int32Type.instance) + .addRegularColumn(TYPE, UTF8Type.instance) + .build()); + } + + public DataSet data() + { + SimpleDataSet result = new SimpleDataSet(metadata()); + + for (KeyspaceMetadata keyspace : VirtualKeyspaceRegistry.instance.virtualKeyspacesMetadata()) + { + for (TableMetadata table : keyspace.tables) + { + for (ColumnMetadata column : table.columns()) + { + result.row(column.ksName, column.cfName, column.name.toString()) + .column(CLUSTERING_ORDER, column.clusteringOrder().toString().toLowerCase()) + .column(COLUMN_NAME_BYTES, column.name.bytes) + .column(KIND, column.kind.toString().toLowerCase()) + .column(POSITION, column.position()) + .column(TYPE, column.type.asCQL3Type().toString()); + } + } + } + + return result; + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/db/virtual/VirtualTable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/virtual/VirtualTable.java b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java new file mode 100644 index 0000000..ea196ca --- /dev/null +++ b/src/java/org/apache/cassandra/db/virtual/VirtualTable.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.db.virtual; + +import org.apache.cassandra.db.DataRange; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.filter.ClusteringIndexFilter; +import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.schema.TableMetadata; + +/** + * A system view used to expose system information. + */ +public interface VirtualTable +{ + /** + * Returns the view name. + * + * @return the view name. + */ + default String name() + { + return metadata().name; + } + + /** + * Returns the view metadata. + * + * @return the view metadata. + */ + TableMetadata metadata(); + + /** + * Applies the specified update. + * @param update the update to apply + */ + void apply(PartitionUpdate update); + + /** + * Selects the rows from a single partition. + * + * @param partitionKey the partition key + * @param clusteringIndexFilter the clustering columns to selected + * @param columnFilter the selected columns + * @return the rows corresponding to the requested data. + */ + UnfilteredPartitionIterator select(DecoratedKey partitionKey, ClusteringIndexFilter clusteringIndexFilter, ColumnFilter columnFilter); + + /** + * Selects the rows from a range of partitions. + * + * @param dataRange the range of data to retrieve + * @param columnFilter the selected columns + * @return the rows corresponding to the requested data. + */ + UnfilteredPartitionIterator select(DataRange dataRange, ColumnFilter columnFilter); +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/IndexRegistry.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/IndexRegistry.java b/src/java/org/apache/cassandra/index/IndexRegistry.java index 9f5ed02..e4c531b 100644 --- a/src/java/org/apache/cassandra/index/IndexRegistry.java +++ b/src/java/org/apache/cassandra/index/IndexRegistry.java @@ -21,8 +21,14 @@ package org.apache.cassandra.index; import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.filter.RowFilter; +import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.schema.IndexMetadata; +import org.apache.cassandra.schema.TableMetadata; /** * The collection of all Index instances for a base table. @@ -34,9 +40,72 @@ import org.apache.cassandra.schema.IndexMetadata; */ public interface IndexRegistry { + /** + * An empty {@code IndexRegistry} + */ + public static final IndexRegistry EMPTY = new IndexRegistry() + { + @Override + public void unregisterIndex(Index index) + { + } + + @Override + public void registerIndex(Index index) + { + } + + @Override + public Collection<Index> listIndexes() + { + return Collections.emptyList(); + } + + @Override + public Index getIndex(IndexMetadata indexMetadata) + { + return null; + } + + @Override + public Optional<Index> getBestIndexFor(RowFilter.Expression expression) + { + return Optional.empty(); + } + + @Override + public void validate(PartitionUpdate update) + { + } + }; + void registerIndex(Index index); void unregisterIndex(Index index); Index getIndex(IndexMetadata indexMetadata); Collection<Index> listIndexes(); + + Optional<Index> getBestIndexFor(RowFilter.Expression expression); + + /** + * Called at write time to ensure that values present in the update + * are valid according to the rules of all registered indexes which + * will process it. The partition key as well as the clustering and + * cell values for each row in the update may be checked by index + * implementations + * + * @param update PartitionUpdate containing the values to be validated by registered Index implementations + */ + void validate(PartitionUpdate update); + + /** + * Returns the {@code IndexRegistry} associated to the specified table. + * + * @param table the table metadata + * @return the {@code IndexRegistry} associated to the specified table + */ + public static IndexRegistry obtain(TableMetadata table) + { + return table.isVirtual() ? EMPTY : Keyspace.openAndGetStore(table).indexManager; + } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/index/internal/CassandraIndex.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java index 9a29c02..fb0d629 100644 --- a/src/java/org/apache/cassandra/index/internal/CassandraIndex.java +++ b/src/java/org/apache/cassandra/index/internal/CassandraIndex.java @@ -739,6 +739,7 @@ public abstract class CassandraIndex implements Index TableMetadata.Builder builder = TableMetadata.builder(baseCfsMetadata.keyspace, baseCfsMetadata.indexTableName(indexMetadata), baseCfsMetadata.id) + .kind(TableMetadata.Kind.INDEX) // tables for legacy KEYS indexes are non-compound and dense .isDense(indexMetadata.isKeys()) .isCompound(!indexMetadata.isKeys()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java index 80a3869..5a72d2c 100644 --- a/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java +++ b/src/java/org/apache/cassandra/schema/KeyspaceMetadata.java @@ -36,16 +36,23 @@ import static java.lang.String.format; */ public final class KeyspaceMetadata { + public enum Kind + { + REGULAR, VIRTUAL + } + public final String name; + public final Kind kind; public final KeyspaceParams params; public final Tables tables; public final Views views; public final Types types; public final Functions functions; - private KeyspaceMetadata(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions) + private KeyspaceMetadata(String name, Kind kind, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions) { this.name = name; + this.kind = kind; this.params = params; this.tables = tables; this.views = views; @@ -55,42 +62,52 @@ public final class KeyspaceMetadata public static KeyspaceMetadata create(String name, KeyspaceParams params) { - return new KeyspaceMetadata(name, params, Tables.none(), Views.none(), Types.none(), Functions.none()); + return new KeyspaceMetadata(name, Kind.REGULAR, params, Tables.none(), Views.none(), Types.none(), Functions.none()); } public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables) { - return new KeyspaceMetadata(name, params, tables, Views.none(), Types.none(), Functions.none()); + return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, Views.none(), Types.none(), Functions.none()); } public static KeyspaceMetadata create(String name, KeyspaceParams params, Tables tables, Views views, Types types, Functions functions) { - return new KeyspaceMetadata(name, params, tables, views, types, functions); + return new KeyspaceMetadata(name, Kind.REGULAR, params, tables, views, types, functions); + } + + public static KeyspaceMetadata virtual(String name, Tables tables) + { + return new KeyspaceMetadata(name, Kind.VIRTUAL, KeyspaceParams.local(), tables, Views.none(), Types.none(), Functions.none()); } public KeyspaceMetadata withSwapped(KeyspaceParams params) { - return new KeyspaceMetadata(name, params, tables, views, types, functions); + return new KeyspaceMetadata(name, kind, params, tables, views, types, functions); } public KeyspaceMetadata withSwapped(Tables regular) { - return new KeyspaceMetadata(name, params, regular, views, types, functions); + return new KeyspaceMetadata(name, kind, params, regular, views, types, functions); } public KeyspaceMetadata withSwapped(Views views) { - return new KeyspaceMetadata(name, params, tables, views, types, functions); + return new KeyspaceMetadata(name, kind, params, tables, views, types, functions); } public KeyspaceMetadata withSwapped(Types types) { - return new KeyspaceMetadata(name, params, tables, views, types, functions); + return new KeyspaceMetadata(name, kind, params, tables, views, types, functions); } public KeyspaceMetadata withSwapped(Functions functions) { - return new KeyspaceMetadata(name, params, tables, views, types, functions); + return new KeyspaceMetadata(name, kind, params, tables, views, types, functions); + } + + public boolean isVirtual() + { + return kind == Kind.VIRTUAL; } public Iterable<TableMetadata> tablesAndViews() @@ -129,7 +146,7 @@ public final class KeyspaceMetadata @Override public int hashCode() { - return Objects.hashCode(name, params, tables, views, functions, types); + return Objects.hashCode(name, kind, params, tables, views, functions, types); } @Override @@ -144,6 +161,7 @@ public final class KeyspaceMetadata KeyspaceMetadata other = (KeyspaceMetadata) o; return name.equals(other.name) + && kind == other.kind && params.equals(other.params) && tables.equals(other.tables) && views.equals(other.views) @@ -156,6 +174,7 @@ public final class KeyspaceMetadata { return MoreObjects.toStringHelper(this) .add("name", name) + .add("kind", kind) .add("params", params) .add("tables", tables) .add("views", views) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/Schema.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/Schema.java b/src/java/org/apache/cassandra/schema/Schema.java index 594b2ab..09ec62a 100644 --- a/src/java/org/apache/cassandra/schema/Schema.java +++ b/src/java/org/apache/cassandra/schema/Schema.java @@ -21,6 +21,7 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.stream.Collectors; +import javax.annotation.Nullable; import com.google.common.collect.ImmutableList; import com.google.common.collect.MapDifference; @@ -28,15 +29,12 @@ import com.google.common.collect.Sets; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.functions.*; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.db.KeyspaceNotDefinedException; -import org.apache.cassandra.db.Mutation; -import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.db.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.marshal.AbstractType; import org.apache.cassandra.db.marshal.UserType; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.UnknownTableException; @@ -312,7 +310,8 @@ public final class Schema public KeyspaceMetadata getKeyspaceMetadata(String keyspaceName) { assert keyspaceName != null; - return keyspaces.getNullable(keyspaceName); + KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName); + return null != keyspace ? keyspace : VirtualKeyspaceRegistry.instance.getKeyspaceMetadataNullable(keyspaceName); } private Set<String> getNonSystemKeyspacesSet() @@ -426,15 +425,17 @@ public final class Schema assert keyspace != null; assert table != null; - KeyspaceMetadata ksm = keyspaces.getNullable(keyspace); + KeyspaceMetadata ksm = getKeyspaceMetadata(keyspace); return ksm == null ? null : ksm.getTableOrViewNullable(table); } + @Nullable public TableMetadata getTableMetadata(TableId id) { - return keyspaces.getTableOrViewNullable(id); + TableMetadata table = keyspaces.getTableOrViewNullable(id); + return null != table ? table : VirtualKeyspaceRegistry.instance.getTableMetadataNullable(id); } public TableMetadata validateTable(String keyspaceName, String tableName) @@ -442,7 +443,7 @@ public final class Schema if (tableName.isEmpty()) throw new InvalidRequestException("non-empty table is required"); - KeyspaceMetadata keyspace = keyspaces.getNullable(keyspaceName); + KeyspaceMetadata keyspace = getKeyspaceMetadata(keyspaceName); if (keyspace == null) throw new KeyspaceNotDefinedException(format("keyspace %s does not exist", keyspaceName)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 638e912..4945fc2 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -1141,7 +1141,7 @@ public final class SchemaKeyspace TableMetadata metadata = TableMetadata.builder(keyspaceName, viewName, TableId.fromUUID(row.getUUID("id"))) - .isView(true) + .kind(TableMetadata.Kind.VIEW) .addColumns(columns) .droppedColumns(fetchDroppedColumns(keyspaceName, viewName)) .params(createTableParamsFromRow(row)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/schema/TableMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/TableMetadata.java b/src/java/org/apache/cassandra/schema/TableMetadata.java index 4634438..47e5b47 100644 --- a/src/java/org/apache/cassandra/schema/TableMetadata.java +++ b/src/java/org/apache/cassandra/schema/TableMetadata.java @@ -21,6 +21,8 @@ import java.nio.ByteBuffer; import java.util.*; import java.util.Objects; +import javax.annotation.Nullable; + import com.google.common.base.MoreObjects; import com.google.common.collect.*; @@ -82,15 +84,21 @@ public final class TableMetadata } } + public enum Kind + { + REGULAR, INDEX, VIEW, VIRTUAL + } + public final String keyspace; public final String name; public final TableId id; public final IPartitioner partitioner; + public final Kind kind; public final TableParams params; public final ImmutableSet<Flag> flags; - private final boolean isView; + @Nullable private final String indexName; // derived from table name /* @@ -139,12 +147,10 @@ public final class TableMetadata id = builder.id; partitioner = builder.partitioner; + kind = builder.kind; params = builder.params.build(); - isView = builder.isView; - indexName = name.contains(".") - ? name.substring(name.indexOf('.') + 1) - : null; + indexName = kind == Kind.INDEX ? name.substring(name.indexOf('.') + 1) : null; droppedColumns = ImmutableMap.copyOf(builder.droppedColumns); Collections.sort(builder.partitionKeyColumns); @@ -184,23 +190,28 @@ public final class TableMetadata { return builder(keyspace, name, id) .partitioner(partitioner) + .kind(kind) .params(params) .flags(flags) - .isView(isView) .addColumns(columns()) .droppedColumns(droppedColumns) .indexes(indexes) .triggers(triggers); } + public boolean isIndex() + { + return kind == Kind.INDEX; + } + public boolean isView() { - return isView; + return kind == Kind.VIEW; } - public boolean isIndex() + public boolean isVirtual() { - return indexName != null; + return kind == Kind.VIRTUAL; } public Optional<String> indexName() @@ -534,7 +545,7 @@ public final class TableMetadata private void except(String format, Object... args) { - throw new ConfigurationException(keyspace + "." + name + ": " +format(format, args)); + throw new ConfigurationException(keyspace + "." + name + ": " + format(format, args)); } @Override @@ -552,9 +563,9 @@ public final class TableMetadata && name.equals(tm.name) && id.equals(tm.id) && partitioner.equals(tm.partitioner) + && kind == tm.kind && params.equals(tm.params) && flags.equals(tm.flags) - && isView == tm.isView && columns.equals(tm.columns) && droppedColumns.equals(tm.droppedColumns) && indexes.equals(tm.indexes) @@ -564,7 +575,7 @@ public final class TableMetadata @Override public int hashCode() { - return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers); + return Objects.hash(keyspace, name, id, partitioner, kind, params, flags, columns, droppedColumns, indexes, triggers); } @Override @@ -580,9 +591,9 @@ public final class TableMetadata .add("table", name) .add("id", id) .add("partitioner", partitioner) + .add("kind", kind) .add("params", params) .add("flags", flags) - .add("isView", isView) .add("columns", columns()) .add("droppedColumns", droppedColumns.values()) .add("indexes", indexes) @@ -598,6 +609,7 @@ public final class TableMetadata private TableId id; private IPartitioner partitioner; + private Kind kind = Kind.REGULAR; private TableParams.Builder params = TableParams.builder(); // Setting compound as default as "normal" CQL tables are compound and that's what we want by default @@ -611,8 +623,6 @@ public final class TableMetadata private final List<ColumnMetadata> clusteringColumns = new ArrayList<>(); private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>(); - private boolean isView; - private Builder(String keyspace, String name, TableId id) { this.keyspace = keyspace; @@ -649,6 +659,12 @@ public final class TableMetadata return this; } + public Builder kind(Kind val) + { + kind = val; + return this; + } + public Builder params(TableParams val) { params = val.unbuild(); @@ -733,12 +749,6 @@ public final class TableMetadata return this; } - public Builder isView(boolean val) - { - isView = val; - return this; - } - public Builder flags(Set<Flag> val) { flags = val; @@ -979,6 +989,6 @@ public final class TableMetadata */ public boolean enforceStrictLiveness() { - return isView && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness(); + return isView() && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness(); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CASRequest.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CASRequest.java b/src/java/org/apache/cassandra/service/CASRequest.java index 1db100d..88fb9bd 100644 --- a/src/java/org/apache/cassandra/service/CASRequest.java +++ b/src/java/org/apache/cassandra/service/CASRequest.java @@ -17,7 +17,7 @@ */ package org.apache.cassandra.service; -import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.SinglePartitionReadQuery; import org.apache.cassandra.db.partitions.FilteredPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.exceptions.InvalidRequestException; @@ -30,7 +30,7 @@ public interface CASRequest /** * The command to use to fetch the value to compare for the CAS. */ - public SinglePartitionReadCommand readCommand(int nowInSec); + public SinglePartitionReadQuery readCommand(int nowInSec); /** * Returns whether the provided CF, that represents the values fetched using the http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 6e0b92b..815e673 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -46,6 +46,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.concurrent.ScheduledExecutors; +import org.apache.cassandra.db.virtual.SystemViewsKeyspace; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; +import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.StartupClusterConnectivityChecker; import org.apache.cassandra.schema.TableMetadata; @@ -249,6 +252,8 @@ public class CassandraDaemon throw e; } + VirtualKeyspaceRegistry.instance.register(VirtualSchemaKeyspace.instance); + VirtualKeyspaceRegistry.instance.register(SystemViewsKeyspace.instance); // clean up debris in the rest of the keyspaces for (String keyspaceName : Schema.instance.getKeyspaces()) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/ClientState.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/ClientState.java b/src/java/org/apache/cassandra/service/ClientState.java index c854737..234ac4f 100644 --- a/src/java/org/apache/cassandra/service/ClientState.java +++ b/src/java/org/apache/cassandra/service/ClientState.java @@ -29,6 +29,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.auth.*; +import org.apache.cassandra.db.virtual.VirtualSchemaKeyspace; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.config.DatabaseDescriptor; @@ -65,8 +66,12 @@ public class ClientState for (String cf : Arrays.asList(SystemKeyspace.LOCAL, SystemKeyspace.LEGACY_PEERS, SystemKeyspace.PEERS_V2)) READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SYSTEM_KEYSPACE_NAME, cf)); + // make all schema tables readable by default (required by the drivers) SchemaKeyspace.ALL.forEach(table -> READABLE_SYSTEM_RESOURCES.add(DataResource.table(SchemaConstants.SCHEMA_KEYSPACE_NAME, table))); + // make all virtual schema tables readable by default as well + VirtualSchemaKeyspace.instance.tables().forEach(t -> READABLE_SYSTEM_RESOURCES.add(t.metadata().resource)); + // neither clients nor tools need authentication/authorization if (DatabaseDescriptor.isDaemonInitialized()) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 37bfd17..7e9b0f9 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -259,7 +259,7 @@ public class StorageProxy implements StorageProxyMBean // read the current values and check they validate the conditions Tracing.trace("Reading existing values for CAS precondition"); - SinglePartitionReadCommand readCommand = request.readCommand(FBUtilities.nowInSeconds()); + SinglePartitionReadCommand readCommand = (SinglePartitionReadCommand) request.readCommand(FBUtilities.nowInSeconds()); ConsistencyLevel readConsistency = consistencyForPaxos == ConsistencyLevel.LOCAL_SERIAL ? ConsistencyLevel.LOCAL_QUORUM : ConsistencyLevel.QUORUM; FilteredPartition current; @@ -1633,7 +1633,7 @@ public class StorageProxy implements StorageProxyMBean public static PartitionIterator read(SinglePartitionReadCommand.Group group, ConsistencyLevel consistencyLevel, ClientState state, long queryStartNanoTime) throws UnavailableException, IsBootstrappingException, ReadFailureException, ReadTimeoutException, InvalidRequestException { - if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.commands)) + if (StorageService.instance.isBootstrapMode() && !systemKeyspaceQuery(group.queries)) { readMetrics.unavailables.mark(); readMetricsMap.get(consistencyLevel).unavailables.mark(); @@ -1649,11 +1649,11 @@ public class StorageProxy implements StorageProxyMBean throws InvalidRequestException, UnavailableException, ReadFailureException, ReadTimeoutException { assert state != null; - if (group.commands.size() > 1) + if (group.queries.size() > 1) throw new InvalidRequestException("SERIAL/LOCAL_SERIAL consistency may only be requested for one partition at a time"); long start = System.nanoTime(); - SinglePartitionReadCommand command = group.commands.get(0); + SinglePartitionReadCommand command = group.queries.get(0); TableMetadata metadata = command.metadata(); DecoratedKey key = command.partitionKey(); @@ -1685,7 +1685,7 @@ public class StorageProxy implements StorageProxyMBean throw new ReadFailureException(consistencyLevel, e.received, e.blockFor, false, e.failureReasonByEndpoint); } - result = fetchRows(group.commands, consistencyForCommitOrFetch, queryStartNanoTime); + result = fetchRows(group.queries, consistencyForCommitOrFetch, queryStartNanoTime); } catch (UnavailableException e) { @@ -1727,13 +1727,13 @@ public class StorageProxy implements StorageProxyMBean long start = System.nanoTime(); try { - PartitionIterator result = fetchRows(group.commands, consistencyLevel, queryStartNanoTime); + PartitionIterator result = fetchRows(group.queries, consistencyLevel, queryStartNanoTime); // Note that the only difference between the command in a group must be the partition key on which // they applied. - boolean enforceStrictLiveness = group.commands.get(0).metadata().enforceStrictLiveness(); + boolean enforceStrictLiveness = group.queries.get(0).metadata().enforceStrictLiveness(); // If we have more than one command, then despite each read command honoring the limit, the total result // might not honor it and so we should enforce it - if (group.commands.size() > 1) + if (group.queries.size() > 1) result = group.limits().filter(result, group.nowInSec(), group.selectsFullPartition(), enforceStrictLiveness); return result; } @@ -1761,7 +1761,7 @@ public class StorageProxy implements StorageProxyMBean readMetrics.addNano(latency); readMetricsMap.get(consistencyLevel).addNano(latency); // TODO avoid giving every command the same latency number. Can fix this in CASSADRA-5329 - for (ReadCommand command : group.commands) + for (ReadCommand command : group.queries) Keyspace.openAndGetStore(command.metadata()).metric.coordinatorReadLatency.update(latency, TimeUnit.NANOSECONDS); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 8570f10..4214644 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -69,6 +69,7 @@ import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.compaction.CompactionManager; import org.apache.cassandra.db.compaction.Verifier; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.virtual.VirtualKeyspaceRegistry; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token.TokenFactory; @@ -3456,12 +3457,18 @@ public class StorageService extends NotificationBroadcasterSupport implements IE } - private Keyspace getValidKeyspace(String keyspaceName) throws IOException + private void verifyKeyspaceIsValid(String keyspaceName) { + if (null != VirtualKeyspaceRegistry.instance.getKeyspaceNullable(keyspaceName)) + throw new IllegalArgumentException("Cannot perform any operations against virtual keyspace " + keyspaceName); + if (!Schema.instance.getKeyspaces().contains(keyspaceName)) - { - throw new IOException("Keyspace " + keyspaceName + " does not exist"); - } + throw new IllegalArgumentException("Keyspace " + keyspaceName + " does not exist"); + } + + private Keyspace getValidKeyspace(String keyspaceName) + { + verifyKeyspaceIsValid(keyspaceName); return Keyspace.open(keyspaceName); } @@ -4787,6 +4794,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void truncate(String keyspace, String table) throws TimeoutException, IOException { + verifyKeyspaceIsValid(keyspace); + try { StorageProxy.truncateBlocking(keyspace, table); @@ -5249,6 +5258,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE { if (!isInitialized()) throw new RuntimeException("Not yet initialized, can't load new sstables"); + verifyKeyspaceIsValid(ksName); ColumnFamilyStore.loadNewSSTables(ksName, cfName); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java index 8ebbdf7..da64a0c 100644 --- a/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/AbstractQueryPager.java @@ -26,9 +26,9 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.transport.ProtocolVersion; -abstract class AbstractQueryPager implements QueryPager +abstract class AbstractQueryPager<T extends ReadQuery> implements QueryPager { - protected final ReadCommand command; + protected final T query; protected final DataLimits limits; protected final ProtocolVersion protocolVersion; private final boolean enforceStrictLiveness; @@ -43,12 +43,12 @@ abstract class AbstractQueryPager implements QueryPager private boolean exhausted; - protected AbstractQueryPager(ReadCommand command, ProtocolVersion protocolVersion) + protected AbstractQueryPager(T query, ProtocolVersion protocolVersion) { - this.command = command; + this.query = query; this.protocolVersion = protocolVersion; - this.limits = command.limits(); - this.enforceStrictLiveness = command.metadata().enforceStrictLiveness(); + this.limits = query.limits(); + this.enforceStrictLiveness = query.metadata().enforceStrictLiveness(); this.remaining = limits.count(); this.remainingInPartition = limits.perPartitionCount(); @@ -56,7 +56,7 @@ abstract class AbstractQueryPager implements QueryPager public ReadExecutionController executionController() { - return command.executionController(); + return query.executionController(); } public PartitionIterator fetchPage(int pageSize, ConsistencyLevel consistency, ClientState clientState, long queryStartNanoTime) @@ -65,8 +65,8 @@ abstract class AbstractQueryPager implements QueryPager return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - Pager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).execute(consistency, clientState, queryStartNanoTime), pager); + Pager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec()); + return Transformation.apply(nextPageReadQuery(pageSize).execute(consistency, clientState, queryStartNanoTime), pager); } public PartitionIterator fetchPageInternal(int pageSize, ReadExecutionController executionController) @@ -75,8 +75,8 @@ abstract class AbstractQueryPager implements QueryPager return EmptyIterators.partition(); pageSize = Math.min(pageSize, remaining); - RowPager pager = new RowPager(limits.forPaging(pageSize), command.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).executeInternal(executionController), pager); + RowPager pager = new RowPager(limits.forPaging(pageSize), query.nowInSec()); + return Transformation.apply(nextPageReadQuery(pageSize).executeInternal(executionController), pager); } public UnfilteredPartitionIterator fetchPageUnfiltered(TableMetadata metadata, int pageSize, ReadExecutionController executionController) @@ -85,9 +85,9 @@ abstract class AbstractQueryPager implements QueryPager return EmptyIterators.unfilteredPartition(metadata); pageSize = Math.min(pageSize, remaining); - UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), command.nowInSec()); + UnfilteredPager pager = new UnfilteredPager(limits.forPaging(pageSize), query.nowInSec()); - return Transformation.apply(nextPageReadCommand(pageSize).executeLocally(executionController), pager); + return Transformation.apply(nextPageReadQuery(pageSize).executeLocally(executionController), pager); } private class UnfilteredPager extends Pager<Unfiltered> @@ -128,7 +128,7 @@ abstract class AbstractQueryPager implements QueryPager private Pager(DataLimits pageLimits, int nowInSec) { - this.counter = pageLimits.newCounter(nowInSec, true, command.selectsFullPartition(), enforceStrictLiveness); + this.counter = pageLimits.newCounter(nowInSec, true, query.selectsFullPartition(), enforceStrictLiveness); this.pageLimits = pageLimits; } @@ -228,7 +228,7 @@ abstract class AbstractQueryPager implements QueryPager return remainingInPartition; } - protected abstract ReadCommand nextPageReadCommand(int pageSize); + protected abstract T nextPageReadQuery(int pageSize); protected abstract void recordLast(DecoratedKey key, Row row); protected abstract boolean isPreviouslyReturnedPartition(DecoratedKey key); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java index 9dae11c..ca16967 100644 --- a/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/MultiPartitionPager.java @@ -31,20 +31,20 @@ import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.service.ClientState; /** - * Pager over a list of ReadCommand. + * Pager over a list of SinglePartitionReadQuery. * - * Note that this is not easy to make efficient. Indeed, we need to page the first command fully before - * returning results from the next one, but if the result returned by each command is small (compared to pageSize), - * paging the commands one at a time under-performs compared to parallelizing. On the other, if we parallelize - * and each command raised pageSize results, we'll end up with commands.size() * pageSize results in memory, which + * Note that this is not easy to make efficient. Indeed, we need to page the first query fully before + * returning results from the next one, but if the result returned by each query is small (compared to pageSize), + * paging the queries one at a time under-performs compared to parallelizing. On the other, if we parallelize + * and each query raised pageSize results, we'll end up with queries.size() * pageSize results in memory, which * defeats the purpose of paging. * - * For now, we keep it simple (somewhat) and just do one command at a time. Provided that we make sure to not + * For now, we keep it simple (somewhat) and just do one query at a time. Provided that we make sure to not * create a pager unless we need to, this is probably fine. Though if we later want to get fancy, we could use the - * cfs meanPartitionSize to decide if parallelizing some of the command might be worth it while being confident we don't + * cfs meanPartitionSize to decide if parallelizing some of the query might be worth it while being confident we don't * blow out memory. */ -public class MultiPartitionPager implements QueryPager +public class MultiPartitionPager<T extends SinglePartitionReadQuery> implements QueryPager { private final SinglePartitionPager[] pagers; private final DataLimits limit; @@ -54,33 +54,33 @@ public class MultiPartitionPager implements QueryPager private int remaining; private int current; - public MultiPartitionPager(SinglePartitionReadCommand.Group group, PagingState state, ProtocolVersion protocolVersion) + public MultiPartitionPager(SinglePartitionReadQuery.Group<T> group, PagingState state, ProtocolVersion protocolVersion) { this.limit = group.limits(); this.nowInSec = group.nowInSec(); int i = 0; - // If it's not the beginning (state != null), we need to find where we were and skip previous commands + // If it's not the beginning (state != null), we need to find where we were and skip previous queries // since they are done. if (state != null) - for (; i < group.commands.size(); i++) - if (group.commands.get(i).partitionKey().getKey().equals(state.partitionKey)) + for (; i < group.queries.size(); i++) + if (group.queries.get(i).partitionKey().getKey().equals(state.partitionKey)) break; - if (i >= group.commands.size()) + if (i >= group.queries.size()) { pagers = null; return; } - pagers = new SinglePartitionPager[group.commands.size() - i]; + pagers = new SinglePartitionPager[group.queries.size() - i]; // 'i' is on the first non exhausted pager for the previous page (or the first one) - SinglePartitionReadCommand command = group.commands.get(i); - pagers[0] = command.getPager(state, protocolVersion); + T query = group.queries.get(i); + pagers[0] = query.getPager(state, protocolVersion); // Following ones haven't been started yet - for (int j = i + 1; j < group.commands.size(); j++) - pagers[j - i] = group.commands.get(j).getPager(null, protocolVersion); + for (int j = i + 1; j < group.queries.size(); j++) + pagers[j - i] = group.queries.get(j).getPager(null, protocolVersion); remaining = state == null ? limit.count() : state.remaining; } @@ -103,11 +103,11 @@ public class MultiPartitionPager implements QueryPager SinglePartitionPager[] newPagers = Arrays.copyOf(pagers, pagers.length); newPagers[current] = newPagers[current].withUpdatedLimit(newLimits); - return new MultiPartitionPager(newPagers, - newLimits, - nowInSec, - remaining, - current); + return new MultiPartitionPager<T>(newPagers, + newLimits, + nowInSec, + remaining, + current); } public PagingState state() http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java index ba6862d..cebf3c6 100644 --- a/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java +++ b/src/java/org/apache/cassandra/service/pager/PartitionRangeQueryPager.java @@ -21,37 +21,36 @@ import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.DataLimits; import org.apache.cassandra.db.rows.Row; import org.apache.cassandra.dht.*; -import org.apache.cassandra.exceptions.RequestExecutionException; import org.apache.cassandra.transport.ProtocolVersion; /** - * Pages a PartitionRangeReadCommand. + * Pages a PartitionRangeReadQuery. */ -public class PartitionRangeQueryPager extends AbstractQueryPager +public class PartitionRangeQueryPager extends AbstractQueryPager<PartitionRangeReadQuery> { private volatile DecoratedKey lastReturnedKey; private volatile PagingState.RowMark lastReturnedRow; - public PartitionRangeQueryPager(PartitionRangeReadCommand command, PagingState state, ProtocolVersion protocolVersion) + public PartitionRangeQueryPager(PartitionRangeReadQuery query, PagingState state, ProtocolVersion protocolVersion) { - super(command, protocolVersion); + super(query, protocolVersion); if (state != null) { - lastReturnedKey = command.metadata().partitioner.decorateKey(state.partitionKey); + lastReturnedKey = query.metadata().partitioner.decorateKey(state.partitionKey); lastReturnedRow = state.rowMark; restoreState(lastReturnedKey, state.remaining, state.remainingInPartition); } } - public PartitionRangeQueryPager(ReadCommand command, + public PartitionRangeQueryPager(PartitionRangeReadQuery query, ProtocolVersion protocolVersion, DecoratedKey lastReturnedKey, PagingState.RowMark lastReturnedRow, int remaining, int remainingInPartition) { - super(command, protocolVersion); + super(query, protocolVersion); this.lastReturnedKey = lastReturnedKey; this.lastReturnedRow = lastReturnedRow; restoreState(lastReturnedKey, remaining, remainingInPartition); @@ -59,7 +58,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager public PartitionRangeQueryPager withUpdatedLimit(DataLimits newLimits) { - return new PartitionRangeQueryPager(command.withUpdatedLimit(newLimits), + return new PartitionRangeQueryPager(query.withUpdatedLimit(newLimits), protocolVersion, lastReturnedKey, lastReturnedRow, @@ -74,16 +73,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager : new PagingState(lastReturnedKey.getKey(), lastReturnedRow, maxRemaining(), remainingInPartition()); } - protected ReadCommand nextPageReadCommand(int pageSize) - throws RequestExecutionException + @Override + protected PartitionRangeReadQuery nextPageReadQuery(int pageSize) { DataLimits limits; - DataRange fullRange = ((PartitionRangeReadCommand)command).dataRange(); + DataRange fullRange = query.dataRange(); DataRange pageRange; if (lastReturnedKey == null) { pageRange = fullRange; - limits = command.limits().forPaging(pageSize); + limits = query.limits().forPaging(pageSize); } else { @@ -92,17 +91,17 @@ public class PartitionRangeQueryPager extends AbstractQueryPager AbstractBounds<PartitionPosition> bounds = makeKeyBounds(lastReturnedKey, includeLastKey); if (includeLastKey) { - pageRange = fullRange.forPaging(bounds, command.metadata().comparator, lastReturnedRow.clustering(command.metadata()), false); - limits = command.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); + pageRange = fullRange.forPaging(bounds, query.metadata().comparator, lastReturnedRow.clustering(query.metadata()), false); + limits = query.limits().forPaging(pageSize, lastReturnedKey.getKey(), remainingInPartition()); } else { pageRange = fullRange.forSubRange(bounds); - limits = command.limits().forPaging(pageSize); + limits = query.limits().forPaging(pageSize); } } - return ((PartitionRangeReadCommand) command).withUpdatedLimitsAndDataRange(limits, pageRange); + return query.withUpdatedLimitsAndDataRange(limits, pageRange); } protected void recordLast(DecoratedKey key, Row last) @@ -111,7 +110,7 @@ public class PartitionRangeQueryPager extends AbstractQueryPager { lastReturnedKey = key; if (last.clustering() != Clustering.STATIC_CLUSTERING) - lastReturnedRow = PagingState.RowMark.create(command.metadata(), last, protocolVersion); + lastReturnedRow = PagingState.RowMark.create(query.metadata(), last, protocolVersion); } } @@ -123,18 +122,16 @@ public class PartitionRangeQueryPager extends AbstractQueryPager private AbstractBounds<PartitionPosition> makeKeyBounds(PartitionPosition lastReturnedKey, boolean includeLastKey) { - AbstractBounds<PartitionPosition> bounds = ((PartitionRangeReadCommand)command).dataRange().keyRange(); + AbstractBounds<PartitionPosition> bounds = query.dataRange().keyRange(); if (bounds instanceof Range || bounds instanceof Bounds) { return includeLastKey - ? new Bounds<PartitionPosition>(lastReturnedKey, bounds.right) - : new Range<PartitionPosition>(lastReturnedKey, bounds.right); - } - else - { - return includeLastKey - ? new IncludingExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right) - : new ExcludingBounds<PartitionPosition>(lastReturnedKey, bounds.right); + ? new Bounds<>(lastReturnedKey, bounds.right) + : new Range<>(lastReturnedKey, bounds.right); } + + return includeLastKey + ? new IncludingExcludingBounds<>(lastReturnedKey, bounds.right) + : new ExcludingBounds<>(lastReturnedKey, bounds.right); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java index e95c358..93a0265 100644 --- a/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java +++ b/src/java/org/apache/cassandra/service/pager/SinglePartitionPager.java @@ -29,40 +29,36 @@ import org.apache.cassandra.transport.ProtocolVersion; * * For use by MultiPartitionPager. */ -public class SinglePartitionPager extends AbstractQueryPager +public class SinglePartitionPager extends AbstractQueryPager<SinglePartitionReadQuery> { - private final SinglePartitionReadCommand command; - private volatile PagingState.RowMark lastReturned; - public SinglePartitionPager(SinglePartitionReadCommand command, PagingState state, ProtocolVersion protocolVersion) + public SinglePartitionPager(SinglePartitionReadQuery query, PagingState state, ProtocolVersion protocolVersion) { - super(command, protocolVersion); - this.command = command; + super(query, protocolVersion); if (state != null) { lastReturned = state.rowMark; - restoreState(command.partitionKey(), state.remaining, state.remainingInPartition); + restoreState(query.partitionKey(), state.remaining, state.remainingInPartition); } } - private SinglePartitionPager(SinglePartitionReadCommand command, + private SinglePartitionPager(SinglePartitionReadQuery query, ProtocolVersion protocolVersion, PagingState.RowMark rowMark, int remaining, int remainingInPartition) { - super(command, protocolVersion); - this.command = command; + super(query, protocolVersion); this.lastReturned = rowMark; - restoreState(command.partitionKey(), remaining, remainingInPartition); + restoreState(query.partitionKey(), remaining, remainingInPartition); } @Override public SinglePartitionPager withUpdatedLimit(DataLimits newLimits) { - return new SinglePartitionPager(command.withUpdatedLimit(newLimits), + return new SinglePartitionPager(query.withUpdatedLimit(newLimits), protocolVersion, lastReturned, maxRemaining(), @@ -71,12 +67,12 @@ public class SinglePartitionPager extends AbstractQueryPager public ByteBuffer key() { - return command.partitionKey().getKey(); + return query.partitionKey().getKey(); } public DataLimits limits() { - return command.limits(); + return query.limits(); } public PagingState state() @@ -86,20 +82,21 @@ public class SinglePartitionPager extends AbstractQueryPager : new PagingState(null, lastReturned, maxRemaining(), remainingInPartition()); } - protected ReadCommand nextPageReadCommand(int pageSize) + @Override + protected SinglePartitionReadQuery nextPageReadQuery(int pageSize) { - Clustering clustering = lastReturned == null ? null : lastReturned.clustering(command.metadata()); + Clustering clustering = lastReturned == null ? null : lastReturned.clustering(query.metadata()); DataLimits limits = lastReturned == null ? limits().forPaging(pageSize) : limits().forPaging(pageSize, key(), remainingInPartition()); - return command.forPaging(clustering, limits); + return query.forPaging(clustering, limits); } protected void recordLast(DecoratedKey key, Row last) { if (last != null && last.clustering() != Clustering.STATIC_CLUSTERING) - lastReturned = PagingState.RowMark.create(command.metadata(), last, protocolVersion); + lastReturned = PagingState.RowMark.create(query.metadata(), last, protocolVersion); } protected boolean isPreviouslyReturnedPartition(DecoratedKey key) http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cache/CacheProviderTest.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java index 7b8ef94..0852312 100644 --- a/test/unit/org/apache/cassandra/cache/CacheProviderTest.java +++ b/test/unit/org/apache/cassandra/cache/CacheProviderTest.java @@ -186,6 +186,7 @@ public class CacheProviderTest assertEquals(key1.hashCode(), key2.hashCode()); tm = TableMetadata.builder("ks", "tab.indexFoo", id1) + .kind(TableMetadata.Kind.INDEX) .addPartitionKeyColumn("pk", UTF8Type.instance) .indexes(Indexes.of(IndexMetadata.fromSchemaMetadata("indexFoo", IndexMetadata.Kind.KEYS, Collections.emptyMap()))) .build(); http://git-wip-us.apache.org/repos/asf/cassandra/blob/0d464cd2/test/unit/org/apache/cassandra/cql3/CQLTester.java ---------------------------------------------------------------------- diff --git a/test/unit/org/apache/cassandra/cql3/CQLTester.java b/test/unit/org/apache/cassandra/cql3/CQLTester.java index e53342d..662e804 100644 --- a/test/unit/org/apache/cassandra/cql3/CQLTester.java +++ b/test/unit/org/apache/cassandra/cql3/CQLTester.java @@ -837,6 +837,11 @@ public abstract class CQLTester return sessionNet(protocolVersion).execute(formatQuery(query), values); } + protected com.datastax.driver.core.ResultSet executeNet(String query, Object... values) throws Throwable + { + return sessionNet().execute(formatQuery(query), values); + } + protected com.datastax.driver.core.ResultSet executeNetWithPaging(ProtocolVersion version, String query, int pageSize) throws Throwable { return sessionNet(version).execute(new SimpleStatement(formatQuery(query)).setFetchSize(pageSize)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org