http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java index e88f037,164e32d..ca3c69f --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@@ -102,167 -104,170 +102,166 @@@ public final class SchemaKeyspac */ private static final Set<String> TABLES_WITH_CDC_ADDED = ImmutableSet.of(TABLES, VIEWS); - - /** - * Until we upgrade the messaging service version, that is version 4.0, we must preserve the old order (before CASSANDRA-12213) - * for digest calculations, otherwise the nodes will never agree on the schema during a rolling upgrade, see CASSANDRA-13559. - */ - public static final ImmutableList<String> ALL_FOR_DIGEST = - ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, FUNCTIONS, AGGREGATES, INDEXES); - - private static final CFMetaData Keyspaces = - compile(KEYSPACES, - "keyspace definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "durable_writes boolean," - + "replication frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name)))"); - - private static final CFMetaData Tables = - compile(TABLES, - "table definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "bloom_filter_fp_chance double," - + "caching frozen<map<text, text>>," - + "comment text," - + "compaction frozen<map<text, text>>," - + "compression frozen<map<text, text>>," - + "crc_check_chance double," - + "dclocal_read_repair_chance double," - + "default_time_to_live int," - + "extensions frozen<map<text, blob>>," - + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND - + "gc_grace_seconds int," - + "id uuid," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "cdc boolean," - + "PRIMARY KEY ((keyspace_name), table_name))"); - - private static final CFMetaData Columns = - compile(COLUMNS, - "column definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "column_name text," - + "clustering_order text," - + "column_name_bytes blob," - + "kind text," - + "position int," - + "type text," - + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); - - private static final CFMetaData DroppedColumns = - compile(DROPPED_COLUMNS, - "dropped column registry", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "column_name text," - + "dropped_time timestamp," - + "type text," - + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); - - private static final CFMetaData Triggers = - compile(TRIGGERS, - "trigger definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "trigger_name text," - + "options frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); - - private static final CFMetaData Views = - compile(VIEWS, - "view definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "view_name text," - + "base_table_id uuid," - + "base_table_name text," - + "where_clause text," - + "bloom_filter_fp_chance double," - + "caching frozen<map<text, text>>," - + "comment text," - + "compaction frozen<map<text, text>>," - + "compression frozen<map<text, text>>," - + "crc_check_chance double," - + "dclocal_read_repair_chance double," - + "default_time_to_live int," - + "extensions frozen<map<text, blob>>," - + "gc_grace_seconds int," - + "id uuid," - + "include_all_columns boolean," - + "max_index_interval int," - + "memtable_flush_period_in_ms int," - + "min_index_interval int," - + "read_repair_chance double," - + "speculative_retry text," - + "cdc boolean," - + "PRIMARY KEY ((keyspace_name), view_name))"); - - private static final CFMetaData Indexes = - compile(INDEXES, - "secondary index definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "index_name text," - + "kind text," - + "options frozen<map<text, text>>," - + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); - - private static final CFMetaData Types = - compile(TYPES, - "user defined type definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "type_name text," - + "field_names frozen<list<text>>," - + "field_types frozen<list<text>>," - + "PRIMARY KEY ((keyspace_name), type_name))"); - - private static final CFMetaData Functions = - compile(FUNCTIONS, - "user defined function definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "function_name text," - + "argument_types frozen<list<text>>," - + "argument_names frozen<list<text>>," - + "body text," - + "language text," - + "return_type text," - + "called_on_null_input boolean," - + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); - - private static final CFMetaData Aggregates = - compile(AGGREGATES, - "user defined aggregate definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "aggregate_name text," - + "argument_types frozen<list<text>>," - + "final_func text," - + "initcond text," - + "return_type text," - + "state_func text," - + "state_type text," - + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); - - public static final List<CFMetaData> ALL_TABLE_METADATA = + private static final TableMetadata Keyspaces = + parse(KEYSPACES, + "keyspace definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "durable_writes boolean," + + "replication frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name)))"); + + private static final TableMetadata Tables = + parse(TABLES, + "table definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," - + "dclocal_read_repair_chance double," ++ + "dclocal_read_repair_chance double," // no longer used, left for drivers' sake + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND + + "gc_grace_seconds int," + + "id uuid," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," - + "read_repair_chance double," ++ + "read_repair_chance double," // no longer used, left for drivers' sake + + "speculative_retry text," + + "cdc boolean," + + "PRIMARY KEY ((keyspace_name), table_name))"); + + private static final TableMetadata Columns = + parse(COLUMNS, + "column definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "clustering_order text," + + "column_name_bytes blob," + + "kind text," + + "position int," + + "type text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final TableMetadata DroppedColumns = + parse(DROPPED_COLUMNS, + "dropped column registry", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "column_name text," + + "dropped_time timestamp," + + "type text," + + "kind text," + + "PRIMARY KEY ((keyspace_name), table_name, column_name))"); + + private static final TableMetadata Triggers = + parse(TRIGGERS, + "trigger definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "trigger_name text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); + + private static final TableMetadata Views = + parse(VIEWS, + "view definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "view_name text," + + "base_table_id uuid," + + "base_table_name text," + + "where_clause text," + + "bloom_filter_fp_chance double," + + "caching frozen<map<text, text>>," + + "comment text," + + "compaction frozen<map<text, text>>," + + "compression frozen<map<text, text>>," + + "crc_check_chance double," - + "dclocal_read_repair_chance double," ++ + "dclocal_read_repair_chance double," // no longer used, left for drivers' sake + + "default_time_to_live int," + + "extensions frozen<map<text, blob>>," + + "gc_grace_seconds int," + + "id uuid," + + "include_all_columns boolean," + + "max_index_interval int," + + "memtable_flush_period_in_ms int," + + "min_index_interval int," - + "read_repair_chance double," ++ + "read_repair_chance double," // no longer used, left for drivers' sake + + "speculative_retry text," + + "cdc boolean," + + "PRIMARY KEY ((keyspace_name), view_name))"); + + private static final TableMetadata Indexes = + parse(INDEXES, + "secondary index definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "index_name text," + + "kind text," + + "options frozen<map<text, text>>," + + "PRIMARY KEY ((keyspace_name), table_name, index_name))"); + + private static final TableMetadata Types = + parse(TYPES, + "user defined type definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "type_name text," + + "field_names frozen<list<text>>," + + "field_types frozen<list<text>>," + + "PRIMARY KEY ((keyspace_name), type_name))"); + + private static final TableMetadata Functions = + parse(FUNCTIONS, + "user defined function definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "function_name text," + + "argument_types frozen<list<text>>," + + "argument_names frozen<list<text>>," + + "body text," + + "language text," + + "return_type text," + + "called_on_null_input boolean," + + "PRIMARY KEY ((keyspace_name), function_name, argument_types))"); + + private static final TableMetadata Aggregates = + parse(AGGREGATES, + "user defined aggregate definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "aggregate_name text," + + "argument_types frozen<list<text>>," + + "final_func text," + + "initcond text," + + "return_type text," + + "state_func text," + + "state_type text," + + "PRIMARY KEY ((keyspace_name), aggregate_name, argument_types))"); + + private static final List<TableMetadata> ALL_TABLE_METADATA = ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, Views, Types, Functions, Aggregates, Indexes); - private static CFMetaData compile(String name, String description, String schema) + private static TableMetadata parse(String name, String description, String cql) { - return CFMetaData.compile(String.format(schema, name), SchemaConstants.SCHEMA_KEYSPACE_NAME) - .comment(description) - .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)); + return CreateTableStatement.parse(format(cql, name), SchemaConstants.SCHEMA_KEYSPACE_NAME) + .id(TableId.forSystemTable(SchemaConstants.SCHEMA_KEYSPACE_NAME, name)) - .dcLocalReadRepairChance(0.0) + .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7)) + .memtableFlushPeriod((int) TimeUnit.HOURS.toMillis(1)) + .comment(description) + .build(); } public static KeyspaceMetadata metadata() @@@ -524,13 -553,13 +523,13 @@@ { builder.add("bloom_filter_fp_chance", params.bloomFilterFpChance) .add("comment", params.comment) -- .add("dclocal_read_repair_chance", params.dcLocalReadRepairChance) ++ .add("dclocal_read_repair_chance", 0.0) // no longer used, left for drivers' sake .add("default_time_to_live", params.defaultTimeToLive) .add("gc_grace_seconds", params.gcGraceSeconds) .add("max_index_interval", params.maxIndexInterval) .add("memtable_flush_period_in_ms", params.memtableFlushPeriodInMs) .add("min_index_interval", params.minIndexInterval) -- .add("read_repair_chance", params.readRepairChance) ++ .add("read_repair_chance", 0.0) // no longer used, left for drivers' sake .add("speculative_retry", params.speculativeRetry.toString()) .add("crc_check_chance", params.crcCheckChance) .add("caching", params.caching.asMap()) @@@ -994,17 -1070,17 +993,15 @@@ .comment(row.getString("comment")) .compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction"))) .compression(CompressionParams.fromMap(row.getFrozenTextMap("compression"))) -- .dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance")) .defaultTimeToLive(row.getInt("default_time_to_live")) .extensions(row.getFrozenMap("extensions", UTF8Type.instance, BytesType.instance)) .gcGraceSeconds(row.getInt("gc_grace_seconds")) .maxIndexInterval(row.getInt("max_index_interval")) .memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms")) .minIndexInterval(row.getInt("min_index_interval")) -- .readRepairChance(row.getDouble("read_repair_chance")) .crcCheckChance(row.getDouble("crc_check_chance")) - .speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry"))) - .cdc(row.has("cdc") ? row.getBoolean("cdc") : false) + .speculativeRetry(SpeculativeRetryPolicy.fromString(row.getString("speculative_retry"))) + .cdc(row.has("cdc") && row.getBoolean("cdc")) .build(); }
http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/TableMetadata.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java index 3ccc3c5,0000000..4634438 mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/schema/TableMetadata.java +++ b/src/java/org/apache/cassandra/schema/TableMetadata.java @@@ -1,1000 -1,0 +1,984 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.schema; + +import java.nio.ByteBuffer; +import java.util.*; +import java.util.Objects; + +import com.google.common.base.MoreObjects; +import com.google.common.collect.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.auth.DataResource; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.*; +import org.apache.cassandra.db.marshal.*; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; +import org.apache.cassandra.utils.AbstractIterator; +import org.github.jamm.Unmetered; + +import static java.lang.String.format; +import static java.util.stream.Collectors.toList; +import static java.util.stream.Collectors.toSet; + +import static com.google.common.collect.Iterables.transform; +import static org.apache.cassandra.schema.IndexMetadata.isNameValid; + +@Unmetered +public final class TableMetadata +{ + private static final Logger logger = LoggerFactory.getLogger(TableMetadata.class); + private static final ImmutableSet<Flag> DEFAULT_CQL_FLAGS = ImmutableSet.of(Flag.COMPOUND); + private static final ImmutableSet<Flag> DEPRECATED_CS_FLAGS = ImmutableSet.of(Flag.DENSE, Flag.SUPER); + + public static final String COMPACT_STORAGE_HALT_MESSAGE = + "Compact Tables are not allowed in Cassandra starting with 4.0 version. " + + "Use `ALTER ... DROP COMPACT STORAGE` command supplied in 3.x/3.11 Cassandra " + + "in order to migrate off Compact Storage."; + + private static final String COMPACT_STORAGE_DEPRECATION_MESSAGE = + "Incorrect set of flags is was detected in table {}.{}: '{}'. \n" + + "Starting with version 4.0, '{}' flags are deprecated and every table has to have COMPOUND flag. \n" + + "Forcing the following set of flags: '{}'"; + + public enum Flag + { + SUPER, COUNTER, DENSE, COMPOUND; + + public static boolean isCQLCompatible(Set<Flag> flags) + { + return !flags.contains(Flag.DENSE) && !flags.contains(Flag.SUPER) && flags.contains(Flag.COMPOUND); + } + + public static Set<Flag> fromStringSet(Set<String> strings) + { + return strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet()); + } + + public static Set<String> toStringSet(Set<Flag> flags) + { + return flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet()); + } + } + + public final String keyspace; + public final String name; + public final TableId id; + + public final IPartitioner partitioner; + public final TableParams params; + public final ImmutableSet<Flag> flags; + + private final boolean isView; + private final String indexName; // derived from table name + + /* + * All CQL3 columns definition are stored in the columns map. + * On top of that, we keep separated collection of each kind of definition, to + * 1) allow easy access to each kind and + * 2) for the partition key and clustering key ones, those list are ordered by the "component index" of the elements. + */ + public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns; + final ImmutableMap<ByteBuffer, ColumnMetadata> columns; + + private final ImmutableList<ColumnMetadata> partitionKeyColumns; + private final ImmutableList<ColumnMetadata> clusteringColumns; + private final RegularAndStaticColumns regularAndStaticColumns; + + public final Indexes indexes; + public final Triggers triggers; + + // derived automatically from flags and columns + public final AbstractType<?> partitionKeyType; + public final ClusteringComparator comparator; + + /* + * For dense tables, this alias the single non-PK column the table contains (since it can only have one). We keep + * that as convenience to access that column more easily (but we could replace calls by regularAndStaticColumns().iterator().next() + * for those tables in practice). + */ + public final ColumnMetadata compactValueColumn; + + // performance hacks; TODO see if all are really necessary + public final DataResource resource; + + private TableMetadata(Builder builder) + { + if (!Flag.isCQLCompatible(builder.flags)) + { + flags = ImmutableSet.copyOf(Sets.union(Sets.difference(builder.flags, DEPRECATED_CS_FLAGS), DEFAULT_CQL_FLAGS)); + logger.warn(COMPACT_STORAGE_DEPRECATION_MESSAGE, builder.keyspace, builder.name, builder.flags, DEPRECATED_CS_FLAGS, flags); + } + else + { + flags = Sets.immutableEnumSet(builder.flags); + } + keyspace = builder.keyspace; + name = builder.name; + id = builder.id; + + partitioner = builder.partitioner; + params = builder.params.build(); + isView = builder.isView; + + indexName = name.contains(".") + ? name.substring(name.indexOf('.') + 1) + : null; + + droppedColumns = ImmutableMap.copyOf(builder.droppedColumns); + Collections.sort(builder.partitionKeyColumns); + partitionKeyColumns = ImmutableList.copyOf(builder.partitionKeyColumns); + Collections.sort(builder.clusteringColumns); + clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns); + regularAndStaticColumns = RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build(); + columns = ImmutableMap.copyOf(builder.columns); + + indexes = builder.indexes; + triggers = builder.triggers; + + partitionKeyType = partitionKeyColumns.size() == 1 + ? partitionKeyColumns.get(0).type + : CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type)); + + comparator = new ClusteringComparator(transform(clusteringColumns, c -> c.type)); + + compactValueColumn = isCompactTable() + ? CompactTables.getCompactValueColumn(regularAndStaticColumns, isSuper()) + : null; + + resource = DataResource.table(keyspace, name); + } + + public static Builder builder(String keyspace, String table) + { + return new Builder(keyspace, table); + } + + public static Builder builder(String keyspace, String table, TableId id) + { + return new Builder(keyspace, table, id); + } + + public Builder unbuild() + { + return builder(keyspace, name, id) + .partitioner(partitioner) + .params(params) + .flags(flags) + .isView(isView) + .addColumns(columns()) + .droppedColumns(droppedColumns) + .indexes(indexes) + .triggers(triggers); + } + + public boolean isView() + { + return isView; + } + + public boolean isIndex() + { + return indexName != null; + } + + public Optional<String> indexName() + { + return Optional.ofNullable(indexName); + } + + /* + * We call dense a CF for which each component of the comparator is a clustering column, i.e. no + * component is used to store a regular column names. In other words, non-composite static "thrift" + * and CQL3 CF are *not* dense. + */ + public boolean isDense() + { + return flags.contains(Flag.DENSE); + } + + public boolean isCompound() + { + return flags.contains(Flag.COMPOUND); + } + + public boolean isSuper() + { + return flags.contains(Flag.SUPER); + } + + public boolean isCounter() + { + return flags.contains(Flag.COUNTER); + } + + public boolean isCQLTable() + { + return !isSuper() && !isDense() && isCompound(); + } + + public boolean isCompactTable() + { + return !isCQLTable(); + } + + public boolean isStaticCompactTable() + { + return !isSuper() && !isDense() && !isCompound(); + } + + public ImmutableCollection<ColumnMetadata> columns() + { + return columns.values(); + } + + public Iterable<ColumnMetadata> primaryKeyColumns() + { + return Iterables.concat(partitionKeyColumns, clusteringColumns); + } + + public ImmutableList<ColumnMetadata> partitionKeyColumns() + { + return partitionKeyColumns; + } + + public ImmutableList<ColumnMetadata> clusteringColumns() + { + return clusteringColumns; + } + + public RegularAndStaticColumns regularAndStaticColumns() + { + return regularAndStaticColumns; + } + + public Columns regularColumns() + { + return regularAndStaticColumns.regulars; + } + + public Columns staticColumns() + { + return regularAndStaticColumns.statics; + } + + /* + * An iterator over all column definitions but that respect the order of a SELECT *. + * This also "hide" the clustering/regular columns for a non-CQL3 non-dense table for backward compatibility + * sake. + */ + public Iterator<ColumnMetadata> allColumnsInSelectOrder() + { + final boolean isStaticCompactTable = isStaticCompactTable(); + final boolean noNonPkColumns = isCompactTable() && CompactTables.hasEmptyCompactValue(this); + + return new AbstractIterator<ColumnMetadata>() + { + private final Iterator<ColumnMetadata> partitionKeyIter = partitionKeyColumns.iterator(); + private final Iterator<ColumnMetadata> clusteringIter = + isStaticCompactTable ? Collections.emptyIterator() : clusteringColumns.iterator(); + private final Iterator<ColumnMetadata> otherColumns = + noNonPkColumns + ? Collections.emptyIterator() + : (isStaticCompactTable ? staticColumns().selectOrderIterator() + : regularAndStaticColumns.selectOrderIterator()); + + protected ColumnMetadata computeNext() + { + if (partitionKeyIter.hasNext()) + return partitionKeyIter.next(); + + if (clusteringIter.hasNext()) + return clusteringIter.next(); + + return otherColumns.hasNext() ? otherColumns.next() : endOfData(); + } + }; + } + + /** + * Returns the ColumnMetadata for {@code name}. + */ + public ColumnMetadata getColumn(ColumnIdentifier name) + { + return columns.get(name.bytes); + } + + /* + * In general it is preferable to work with ColumnIdentifier to make it + * clear that we are talking about a CQL column, not a cell name, but there + * is a few cases where all we have is a ByteBuffer (when dealing with IndexExpression + * for instance) so... + */ + public ColumnMetadata getColumn(ByteBuffer name) + { + return columns.get(name); + } + + public ColumnMetadata getDroppedColumn(ByteBuffer name) + { + DroppedColumn dropped = droppedColumns.get(name); + return dropped == null ? null : dropped.column; + } + + /** + * Returns a "fake" ColumnMetadata corresponding to the dropped column {@code name} + * of {@code null} if there is no such dropped column. + * + * @param name - the column name + * @param isStatic - whether the column was a static column, if known + */ + public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic) + { + DroppedColumn dropped = droppedColumns.get(name); + if (dropped == null) + return null; + + if (isStatic && !dropped.column.isStatic()) + return ColumnMetadata.staticColumn(this, name, dropped.column.type); + + return dropped.column; + } + + public boolean hasStaticColumns() + { + return !staticColumns().isEmpty(); + } + + public void validate() + { + if (!isNameValid(keyspace)) + except("Keyspace name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, keyspace); + + if (!isNameValid(name)) + except("Table name must not be empty, more than %s characters long, or contain non-alphanumeric-underscore characters (got \"%s\")", SchemaConstants.NAME_LENGTH, name); + + params.validate(); + + if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter())) + except("PRIMARY KEY columns cannot contain counters"); + + // Mixing counter with non counter columns is not supported (#2614) + if (isCounter()) + { + for (ColumnMetadata column : regularAndStaticColumns) + if (!(column.type.isCounter()) && !CompactTables.isSuperColumnMapColumn(column)) + except("Cannot have a non counter column (\"%s\") in a counter table", column.name); + } + else + { + for (ColumnMetadata column : regularAndStaticColumns) + if (column.type.isCounter()) + except("Cannot have a counter column (\"%s\") in a non counter column table", column.name); + } + + // All tables should have a partition key + if (partitionKeyColumns.isEmpty()) + except("Missing partition keys for table %s", toString()); + + // A compact table should always have a clustering + if (isCompactTable() && clusteringColumns.isEmpty()) + except("For table %s, isDense=%b, isCompound=%b, clustering=%s", toString(), isDense(), isCompound(), clusteringColumns); + + if (!indexes.isEmpty() && isSuper()) + except("Secondary indexes are not supported on super column families"); + + indexes.validate(this); + } + + void validateCompatibility(TableMetadata other) + { + if (isIndex()) + return; + + if (!other.keyspace.equals(keyspace)) + except("Keyspace mismatch (found %s; expected %s)", other.keyspace, keyspace); + + if (!other.name.equals(name)) + except("Table mismatch (found %s; expected %s)", other.name, name); + + if (!other.id.equals(id)) + except("Table ID mismatch (found %s; expected %s)", other.id, id); + + if (!other.flags.equals(flags)) + except("Table type mismatch (found %s; expected %s)", other.flags, flags); + + if (other.partitionKeyColumns.size() != partitionKeyColumns.size()) + except("Partition keys of different length (found %s; expected %s)", other.partitionKeyColumns.size(), partitionKeyColumns.size()); + + for (int i = 0; i < partitionKeyColumns.size(); i++) + if (!other.partitionKeyColumns.get(i).type.isCompatibleWith(partitionKeyColumns.get(i).type)) + except("Partition key column mismatch (found %s; expected %s)", other.partitionKeyColumns.get(i).type, partitionKeyColumns.get(i).type); + + if (other.clusteringColumns.size() != clusteringColumns.size()) + except("Clustering columns of different length (found %s; expected %s)", other.clusteringColumns.size(), clusteringColumns.size()); + + for (int i = 0; i < clusteringColumns.size(); i++) + if (!other.clusteringColumns.get(i).type.isCompatibleWith(clusteringColumns.get(i).type)) + except("Clustering column mismatch (found %s; expected %s)", other.clusteringColumns.get(i).type, clusteringColumns.get(i).type); + + for (ColumnMetadata otherColumn : other.regularAndStaticColumns) + { + ColumnMetadata column = getColumn(otherColumn.name); + if (column != null && !otherColumn.type.isCompatibleWith(column.type)) + except("Column mismatch (found %s; expected %s", otherColumn, column); + } + } + + public ClusteringComparator partitionKeyAsClusteringComparator() + { + return new ClusteringComparator(partitionKeyColumns.stream().map(c -> c.type).collect(toList())); + } + + /** + * The type to use to compare column names in "static compact" + * tables or superColum ones. + * <p> + * This exists because for historical reasons, "static compact" tables as + * well as super column ones can have non-UTF8 column names. + * <p> + * This method should only be called for superColumn tables and "static + * compact" ones. For any other table, all column names are UTF8. + */ + public AbstractType<?> staticCompactOrSuperTableColumnNameType() + { + if (isSuper()) + { + assert compactValueColumn != null && compactValueColumn.type instanceof MapType; + return ((MapType) compactValueColumn.type).nameComparator(); + } + + assert isStaticCompactTable(); + return clusteringColumns.get(0).type; + } + + public AbstractType<?> columnDefinitionNameComparator(ColumnMetadata.Kind kind) + { + return (isSuper() && kind == ColumnMetadata.Kind.REGULAR) || (isStaticCompactTable() && kind == ColumnMetadata.Kind.STATIC) + ? staticCompactOrSuperTableColumnNameType() + : UTF8Type.instance; + } + + /** + * Generate a table name for an index corresponding to the given column. + * This is NOT the same as the index's name! This is only used in sstable filenames and is not exposed to users. + * + * @param info A definition of the column with index + * + * @return name of the index table + */ + public String indexTableName(IndexMetadata info) + { + // TODO simplify this when info.index_name is guaranteed to be set + return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name; + } + + /** + * @return true if the change as made impacts queries/updates on the table, + * e.g. any columns or indexes were added, removed, or altered; otherwise, false is returned. + * Used to determine whether prepared statements against this table need to be re-prepared. + */ + boolean changeAffectsPreparedStatements(TableMetadata updated) + { + return !partitionKeyColumns.equals(updated.partitionKeyColumns) + || !clusteringColumns.equals(updated.clusteringColumns) + || !regularAndStaticColumns.equals(updated.regularAndStaticColumns) + || !indexes.equals(updated.indexes) + || params.defaultTimeToLive != updated.params.defaultTimeToLive + || params.gcGraceSeconds != updated.params.gcGraceSeconds; + } + + /** + * There is a couple of places in the code where we need a TableMetadata object and don't have one readily available + * and know that only the keyspace and name matter. This creates such "fake" metadata. Use only if you know what + * you're doing. + */ + public static TableMetadata minimal(String keyspace, String name) + { + return TableMetadata.builder(keyspace, name) + .addPartitionKeyColumn("key", BytesType.instance) + .build(); + } + + public TableMetadata updateIndexTableMetadata(TableParams baseTableParams) + { - TableParams.Builder builder = - baseTableParams.unbuild() - .readRepairChance(0.0) - .dcLocalReadRepairChance(0.0) - .gcGraceSeconds(0); ++ TableParams.Builder builder = baseTableParams.unbuild().gcGraceSeconds(0); + + // Depends on parent's cache setting, turn on its index table's cache. + // Row caching is never enabled; see CASSANDRA-5732 + builder.caching(baseTableParams.caching.cacheKeys() ? CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING); + + return unbuild().params(builder.build()).build(); + } + + private void except(String format, Object... args) + { + throw new ConfigurationException(keyspace + "." + name + ": " +format(format, args)); + } + + @Override + public boolean equals(Object o) + { + if (this == o) + return true; + + if (!(o instanceof TableMetadata)) + return false; + + TableMetadata tm = (TableMetadata) o; + + return keyspace.equals(tm.keyspace) + && name.equals(tm.name) + && id.equals(tm.id) + && partitioner.equals(tm.partitioner) + && params.equals(tm.params) + && flags.equals(tm.flags) + && isView == tm.isView + && columns.equals(tm.columns) + && droppedColumns.equals(tm.droppedColumns) + && indexes.equals(tm.indexes) + && triggers.equals(tm.triggers); + } + + @Override + public int hashCode() + { + return Objects.hash(keyspace, name, id, partitioner, params, flags, isView, columns, droppedColumns, indexes, triggers); + } + + @Override + public String toString() + { + return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), ColumnIdentifier.maybeQuote(name)); + } + + public String toDebugString() + { + return MoreObjects.toStringHelper(this) + .add("keyspace", keyspace) + .add("table", name) + .add("id", id) + .add("partitioner", partitioner) + .add("params", params) + .add("flags", flags) + .add("isView", isView) + .add("columns", columns()) + .add("droppedColumns", droppedColumns.values()) + .add("indexes", indexes) + .add("triggers", triggers) + .toString(); + } + + public static final class Builder + { + final String keyspace; + final String name; + + private TableId id; + + private IPartitioner partitioner; + private TableParams.Builder params = TableParams.builder(); + + // Setting compound as default as "normal" CQL tables are compound and that's what we want by default + private Set<Flag> flags = EnumSet.of(Flag.COMPOUND); + private Triggers triggers = Triggers.none(); + private Indexes indexes = Indexes.none(); + + private final Map<ByteBuffer, DroppedColumn> droppedColumns = new HashMap<>(); + private final Map<ByteBuffer, ColumnMetadata> columns = new HashMap<>(); + private final List<ColumnMetadata> partitionKeyColumns = new ArrayList<>(); + private final List<ColumnMetadata> clusteringColumns = new ArrayList<>(); + private final List<ColumnMetadata> regularAndStaticColumns = new ArrayList<>(); + + private boolean isView; + + private Builder(String keyspace, String name, TableId id) + { + this.keyspace = keyspace; + this.name = name; + this.id = id; + } + + private Builder(String keyspace, String name) + { + this.keyspace = keyspace; + this.name = name; + } + + public TableMetadata build() + { + if (partitioner == null) + partitioner = DatabaseDescriptor.getPartitioner(); + + if (id == null) + id = TableId.generate(); + + return new TableMetadata(this); + } + + public Builder id(TableId val) + { + id = val; + return this; + } + + public Builder partitioner(IPartitioner val) + { + partitioner = val; + return this; + } + + public Builder params(TableParams val) + { + params = val.unbuild(); + return this; + } + + public Builder bloomFilterFpChance(double val) + { + params.bloomFilterFpChance(val); + return this; + } + + public Builder caching(CachingParams val) + { + params.caching(val); + return this; + } + + public Builder comment(String val) + { + params.comment(val); + return this; + } + + public Builder compaction(CompactionParams val) + { + params.compaction(val); + return this; + } + + public Builder compression(CompressionParams val) + { + params.compression(val); + return this; + } + - public Builder dcLocalReadRepairChance(double val) - { - params.dcLocalReadRepairChance(val); - return this; - } - + public Builder defaultTimeToLive(int val) + { + params.defaultTimeToLive(val); + return this; + } + + public Builder gcGraceSeconds(int val) + { + params.gcGraceSeconds(val); + return this; + } + + public Builder maxIndexInterval(int val) + { + params.maxIndexInterval(val); + return this; + } + + public Builder memtableFlushPeriod(int val) + { + params.memtableFlushPeriodInMs(val); + return this; + } + + public Builder minIndexInterval(int val) + { + params.minIndexInterval(val); + return this; + } + - public Builder readRepairChance(double val) - { - params.readRepairChance(val); - return this; - } - + public Builder crcCheckChance(double val) + { + params.crcCheckChance(val); + return this; + } + + public Builder speculativeRetry(SpeculativeRetryPolicy val) + { + params.speculativeRetry(val); + return this; + } + + public Builder extensions(Map<String, ByteBuffer> val) + { + params.extensions(val); + return this; + } + + public Builder isView(boolean val) + { + isView = val; + return this; + } + + public Builder flags(Set<Flag> val) + { + flags = val; + return this; + } + + public Builder isSuper(boolean val) + { + return flag(Flag.SUPER, val); + } + + public Builder isCounter(boolean val) + { + return flag(Flag.COUNTER, val); + } + + public Builder isDense(boolean val) + { + return flag(Flag.DENSE, val); + } + + public Builder isCompound(boolean val) + { + return flag(Flag.COMPOUND, val); + } + + private Builder flag(Flag flag, boolean set) + { + if (set) flags.add(flag); else flags.remove(flag); + return this; + } + + public Builder triggers(Triggers val) + { + triggers = val; + return this; + } + + public Builder indexes(Indexes val) + { + indexes = val; + return this; + } + + public Builder addPartitionKeyColumn(String name, AbstractType type) + { + return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, false), type); + } + + public Builder addPartitionKeyColumn(ColumnIdentifier name, AbstractType type) + { + return addColumn(new ColumnMetadata(keyspace, this.name, name, type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY)); + } + + public Builder addClusteringColumn(String name, AbstractType type) + { + return addClusteringColumn(ColumnIdentifier.getInterned(name, false), type); + } + + public Builder addClusteringColumn(ColumnIdentifier name, AbstractType type) + { + return addColumn(new ColumnMetadata(keyspace, this.name, name, type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING)); + } + + public Builder addRegularColumn(String name, AbstractType type) + { + return addRegularColumn(ColumnIdentifier.getInterned(name, false), type); + } + + public Builder addRegularColumn(ColumnIdentifier name, AbstractType type) + { + return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR)); + } + + public Builder addStaticColumn(String name, AbstractType type) + { + return addStaticColumn(ColumnIdentifier.getInterned(name, false), type); + } + + public Builder addStaticColumn(ColumnIdentifier name, AbstractType type) + { + return addColumn(new ColumnMetadata(keyspace, this.name, name, type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC)); + } + + public Builder addColumn(ColumnMetadata column) + { + if (columns.containsKey(column.name.bytes)) + throw new IllegalArgumentException(); + + switch (column.kind) + { + case PARTITION_KEY: + partitionKeyColumns.add(column); + Collections.sort(partitionKeyColumns); + break; + case CLUSTERING: + column.type.checkComparable(); + clusteringColumns.add(column); + Collections.sort(clusteringColumns); + break; + default: + regularAndStaticColumns.add(column); + } + + columns.put(column.name.bytes, column); + + return this; + } + + public Builder addColumns(Iterable<ColumnMetadata> columns) + { + columns.forEach(this::addColumn); + return this; + } + + public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> droppedColumns) + { + this.droppedColumns.clear(); + this.droppedColumns.putAll(droppedColumns); + return this; + } + + /** + * Records a deprecated column for a system table. + */ + public Builder recordDeprecatedSystemColumn(String name, AbstractType<?> type) + { + // As we play fast and loose with the removal timestamp, make sure this is misued for a non system table. + assert SchemaConstants.isLocalSystemKeyspace(keyspace); + recordColumnDrop(ColumnMetadata.regularColumn(keyspace, this.name, name, type), Long.MAX_VALUE); + return this; + } + + public Builder recordColumnDrop(ColumnMetadata column, long timeMicros) + { + droppedColumns.put(column.name.bytes, new DroppedColumn(column, timeMicros)); + return this; + } + + public Iterable<ColumnMetadata> columns() + { + return columns.values(); + } + + public Set<String> columnNames() + { + return columns.values().stream().map(c -> c.name.toString()).collect(toSet()); + } + + public ColumnMetadata getColumn(ColumnIdentifier identifier) + { + return columns.get(identifier.bytes); + } + + public ColumnMetadata getColumn(ByteBuffer name) + { + return columns.get(name); + } + + public boolean hasRegularColumns() + { + return regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular); + } + + /* + * The following methods all assume a Builder with valid set of partition key, clustering, regular and static columns. + */ + + public Builder removeRegularOrStaticColumn(ColumnIdentifier identifier) + { + ColumnMetadata column = columns.get(identifier.bytes); + if (column == null || column.isPrimaryKeyColumn()) + throw new IllegalArgumentException(); + + columns.remove(identifier.bytes); + regularAndStaticColumns.remove(column); + + return this; + } + + public Builder renamePrimaryKeyColumn(ColumnIdentifier from, ColumnIdentifier to) + { + if (columns.containsKey(to.bytes)) + throw new IllegalArgumentException(); + + ColumnMetadata column = columns.get(from.bytes); + if (column == null || !column.isPrimaryKeyColumn()) + throw new IllegalArgumentException(); + + ColumnMetadata newColumn = column.withNewName(to); + if (column.isPartitionKey()) + partitionKeyColumns.set(column.position(), newColumn); + else + clusteringColumns.set(column.position(), newColumn); + + columns.remove(from.bytes); + columns.put(to.bytes, newColumn); + + return this; + } + + public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> type) + { + ColumnMetadata column = columns.get(name.bytes); + if (column == null) + throw new IllegalArgumentException(); + + ColumnMetadata newColumn = column.withNewType(type); + + switch (column.kind) + { + case PARTITION_KEY: + partitionKeyColumns.set(column.position(), newColumn); + break; + case CLUSTERING: + clusteringColumns.set(column.position(), newColumn); + break; + case REGULAR: + case STATIC: + regularAndStaticColumns.remove(column); + regularAndStaticColumns.add(newColumn); + break; + } + + columns.put(column.name.bytes, newColumn); + + return this; + } + } + + /** + * A table with strict liveness filters/ignores rows without PK liveness info, + * effectively tying the row liveness to its primary key liveness. + * + * Currently this is only used by views with normal base column as PK column + * so updates to other columns do not make the row live when the base column + * is not live. See CASSANDRA-11500. + */ + public boolean enforceStrictLiveness() + { + return isView && Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/TableParams.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/schema/TableParams.java index b7f6806,3750aa1..78dc894 --- a/src/java/org/apache/cassandra/schema/TableParams.java +++ b/src/java/org/apache/cassandra/schema/TableParams.java @@@ -43,14 -41,14 +43,12 @@@ public final class TableParam COMMENT, COMPACTION, COMPRESSION, -- DCLOCAL_READ_REPAIR_CHANCE, DEFAULT_TIME_TO_LIVE, EXTENSIONS, GC_GRACE_SECONDS, MAX_INDEX_INTERVAL, MEMTABLE_FLUSH_PERIOD_IN_MS, MIN_INDEX_INTERVAL, -- READ_REPAIR_CHANCE, SPECULATIVE_RETRY, CRC_CHECK_CHANCE, CDC; @@@ -62,9 -60,19 +60,7 @@@ } } - public static final String DEFAULT_COMMENT = ""; - public static final double DEFAULT_READ_REPAIR_CHANCE = 0.0; - public static final double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1; - public static final int DEFAULT_GC_GRACE_SECONDS = 864000; // 10 days - public static final int DEFAULT_DEFAULT_TIME_TO_LIVE = 0; - public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0; - public static final int DEFAULT_MIN_INDEX_INTERVAL = 128; - public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048; - public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0; - public final String comment; -- public final double readRepairChance; -- public final double dcLocalReadRepairChance; public final double bloomFilterFpChance; public final double crcCheckChance; public final int gcGraceSeconds; @@@ -82,8 -90,8 +78,6 @@@ private TableParams(Builder builder) { comment = builder.comment; -- readRepairChance = builder.readRepairChance; -- dcLocalReadRepairChance = builder.dcLocalReadRepairChance; bloomFilterFpChance = builder.bloomFilterFpChance == null ? builder.compaction.defaultBloomFilterFbChance() : builder.bloomFilterFpChance; @@@ -113,14 -121,14 +107,12 @@@ .comment(params.comment) .compaction(params.compaction) .compression(params.compression) -- .dcLocalReadRepairChance(params.dcLocalReadRepairChance) .crcCheckChance(params.crcCheckChance) .defaultTimeToLive(params.defaultTimeToLive) .gcGraceSeconds(params.gcGraceSeconds) .maxIndexInterval(params.maxIndexInterval) .memtableFlushPeriodInMs(params.memtableFlushPeriodInMs) .minIndexInterval(params.minIndexInterval) -- .readRepairChance(params.readRepairChance) .speculativeRetry(params.speculativeRetry) .extensions(params.extensions) .cdc(params.cdc); @@@ -145,20 -148,20 +137,6 @@@ bloomFilterFpChance); } -- if (dcLocalReadRepairChance < 0 || dcLocalReadRepairChance > 1.0) -- { -- fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", -- Option.DCLOCAL_READ_REPAIR_CHANCE, -- dcLocalReadRepairChance); -- } -- -- if (readRepairChance < 0 || readRepairChance > 1.0) -- { -- fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", -- Option.READ_REPAIR_CHANCE, -- readRepairChance); -- } -- if (crcCheckChance < 0 || crcCheckChance > 1.0) { fail("%s must be larger than or equal to 0 and smaller than or equal to 1.0 (got %s)", @@@ -208,8 -211,8 +186,6 @@@ TableParams p = (TableParams) o; return comment.equals(p.comment) -- && readRepairChance == p.readRepairChance -- && dcLocalReadRepairChance == p.dcLocalReadRepairChance && bloomFilterFpChance == p.bloomFilterFpChance && crcCheckChance == p.crcCheckChance && gcGraceSeconds == p.gcGraceSeconds @@@ -229,8 -232,8 +205,6 @@@ public int hashCode() { return Objects.hashCode(comment, -- readRepairChance, -- dcLocalReadRepairChance, bloomFilterFpChance, crcCheckChance, gcGraceSeconds, @@@ -251,8 -254,8 +225,6 @@@ { return MoreObjects.toStringHelper(this) .add(Option.COMMENT.toString(), comment) -- .add(Option.READ_REPAIR_CHANCE.toString(), readRepairChance) -- .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), dcLocalReadRepairChance) .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), bloomFilterFpChance) .add(Option.CRC_CHECK_CHANCE.toString(), crcCheckChance) .add(Option.GC_GRACE_SECONDS.toString(), gcGraceSeconds) @@@ -271,17 -274,17 +243,15 @@@ public static final class Builder { - private String comment = DEFAULT_COMMENT; - private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE; - private double dcLocalReadRepairChance = DEFAULT_DCLOCAL_READ_REPAIR_CHANCE; + private String comment = ""; - private double readRepairChance = 0.0; - private double dcLocalReadRepairChance = 0.1; private Double bloomFilterFpChance; - public Double crcCheckChance = DEFAULT_CRC_CHECK_CHANCE; - private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS; - private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE; - private int memtableFlushPeriodInMs = DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS; - private int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL; - private int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL; - private SpeculativeRetryParam speculativeRetry = SpeculativeRetryParam.DEFAULT; + public Double crcCheckChance = 1.0; + private int gcGraceSeconds = 864000; // 10 days + private int defaultTimeToLive = 0; + private int memtableFlushPeriodInMs = 0; + private int minIndexInterval = 128; + private int maxIndexInterval = 2048; + private SpeculativeRetryPolicy speculativeRetry = PercentileSpeculativeRetryPolicy.NINETY_NINE_P; private CachingParams caching = CachingParams.DEFAULT; private CompactionParams compaction = CompactionParams.DEFAULT; private CompressionParams compression = CompressionParams.DEFAULT; @@@ -303,18 -306,18 +273,6 @@@ return this; } -- public Builder readRepairChance(double val) -- { -- readRepairChance = val; -- return this; -- } -- -- public Builder dcLocalReadRepairChance(double val) -- { -- dcLocalReadRepairChance = val; -- return this; -- } -- public Builder bloomFilterFpChance(double val) { bloomFilterFpChance = val; http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --cc src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java index f45e623,0000000..6c8a45a mode 100644,000000..100644 --- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java @@@ -1,481 -1,0 +1,436 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.cassandra.service.reads; + +import java.util.List; - import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.ConsistencyLevel; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.SinglePartitionReadCommand; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.exceptions.ReadFailureException; +import org.apache.cassandra.exceptions.ReadTimeoutException; +import org.apache.cassandra.exceptions.UnavailableException; +import org.apache.cassandra.locator.InetAddressAndPort; - import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.net.MessageOut; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.StorageProxy; +import org.apache.cassandra.service.reads.repair.ReadRepair; - import org.apache.cassandra.schema.TableMetadata; +import org.apache.cassandra.service.StorageProxy.LocalReadRunnable; +import org.apache.cassandra.tracing.TraceState; +import org.apache.cassandra.tracing.Tracing; + +/** + * Sends a read request to the replicas needed to satisfy a given ConsistencyLevel. + * + * Optionally, may perform additional requests to provide redundancy against replica failure: + * AlwaysSpeculatingReadExecutor will always send a request to one extra replica, while + * SpeculatingReadExecutor will wait until it looks like the original request is in danger + * of timing out before performing extra reads. + */ +public abstract class AbstractReadExecutor +{ + private static final Logger logger = LoggerFactory.getLogger(AbstractReadExecutor.class); + + protected final ReadCommand command; + protected final ConsistencyLevel consistency; + protected final List<InetAddressAndPort> targetReplicas; + protected final ReadRepair readRepair; + protected final DigestResolver digestResolver; + protected final ReadCallback handler; + protected final TraceState traceState; + protected final ColumnFamilyStore cfs; + protected final long queryStartNanoTime; + protected volatile PartitionIterator result = null; + + AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime) + { + this.command = command; + this.consistency = consistency; + this.targetReplicas = targetReplicas; + this.readRepair = ReadRepair.create(command, targetReplicas, queryStartNanoTime, consistency); + this.digestResolver = new DigestResolver(keyspace, command, consistency, readRepair, targetReplicas.size()); + this.handler = new ReadCallback(digestResolver, consistency, command, targetReplicas, queryStartNanoTime, readRepair); + this.cfs = cfs; + this.traceState = Tracing.instance.get(); + this.queryStartNanoTime = queryStartNanoTime; + + // Set the digest version (if we request some digests). This is the smallest version amongst all our target replicas since new nodes + // knows how to produce older digest but the reverse is not true. + // TODO: we need this when talking with pre-3.0 nodes. So if we preserve the digest format moving forward, we can get rid of this once + // we stop being compatible with pre-3.0 nodes. + int digestVersion = MessagingService.current_version; + for (InetAddressAndPort replica : targetReplicas) + digestVersion = Math.min(digestVersion, MessagingService.instance().getVersion(replica)); + command.setDigestVersion(digestVersion); + } + + private DecoratedKey getKey() + { + if (command instanceof SinglePartitionReadCommand) + { + return ((SinglePartitionReadCommand) command).partitionKey(); + } + else + { + return null; + } + } + + protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints) + { + makeRequests(command, endpoints); + + } + + protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints) + { + makeRequests(command.copyAsDigestQuery(), endpoints); + } + + private void makeRequests(ReadCommand readCommand, Iterable<InetAddressAndPort> endpoints) + { + boolean hasLocalEndpoint = false; + + for (InetAddressAndPort endpoint : endpoints) + { + if (StorageProxy.canDoLocalRequest(endpoint)) + { + hasLocalEndpoint = true; + continue; + } + + if (traceState != null) + traceState.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + logger.trace("reading {} from {}", readCommand.isDigestQuery() ? "digest" : "data", endpoint); + MessageOut<ReadCommand> message = readCommand.createMessage(); + MessagingService.instance().sendRRWithFailure(message, endpoint, handler); + } + + // We delay the local (potentially blocking) read till the end to avoid stalling remote requests. + if (hasLocalEndpoint) + { + logger.trace("reading {} locally", readCommand.isDigestQuery() ? "digest" : "data"); + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new LocalReadRunnable(command, handler)); + } + } + + /** + * Perform additional requests if it looks like the original will time out. May block while it waits + * to see if the original requests are answered first. + */ + public abstract void maybeTryAdditionalReplicas(); + + /** + * Get the replicas involved in the [finished] request. + * + * @return target replicas + the extra replica, *IF* we speculated. + */ + public abstract List<InetAddressAndPort> getContactedReplicas(); + + /** + * send the initial set of requests + */ + public abstract void executeAsync(); + - private static ReadRepairDecision newReadRepairDecision(TableMetadata metadata) - { - if (metadata.params.readRepairChance > 0d || - metadata.params.dcLocalReadRepairChance > 0) - { - double chance = ThreadLocalRandom.current().nextDouble(); - if (metadata.params.readRepairChance > chance) - return ReadRepairDecision.GLOBAL; - - if (metadata.params.dcLocalReadRepairChance > chance) - return ReadRepairDecision.DC_LOCAL; - } - - return ReadRepairDecision.NONE; - } - + /** + * @return an executor appropriate for the configured speculative read policy + */ + public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel consistencyLevel, long queryStartNanoTime) throws UnavailableException + { + Keyspace keyspace = Keyspace.open(command.metadata().keyspace); + List<InetAddressAndPort> allReplicas = StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey()); - // 11980: Excluding EACH_QUORUM reads from potential RR, so that we do not miscount DC responses - ReadRepairDecision repairDecision = consistencyLevel == ConsistencyLevel.EACH_QUORUM - ? ReadRepairDecision.NONE - : newReadRepairDecision(command.metadata()); - List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision); ++ List<InetAddressAndPort> targetReplicas = consistencyLevel.filterForQuery(keyspace, allReplicas); + + // Throw UAE early if we don't have enough replicas. + consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas); + - if (repairDecision != ReadRepairDecision.NONE) - { - Tracing.trace("Read-repair {}", repairDecision); - ReadRepairMetrics.attempted.mark(); - } - + ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(command.metadata().id); + SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry; + + // Speculative retry is disabled *OR* + // 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses - if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) - | consistencyLevel == ConsistencyLevel.EACH_QUORUM) ++ if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM) + return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, false); + + // There are simply no extra replicas to speculate. + // Handle this separately so it can log failed attempts to speculate due to lack of replicas + if (consistencyLevel.blockFor(keyspace) == allReplicas.size()) + return new NeverSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime, true); + + if (targetReplicas.size() == allReplicas.size()) + { - // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC. ++ // CL.ALL + // We are going to contact every node anyway, so ask for 2 full data requests instead of 1, for redundancy + // (same amount of requests in total, but we turn 1 digest request into a full blown data request). + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + } + - // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs. - InetAddressAndPort extraReplica = allReplicas.get(targetReplicas.size()); - // With repair decision DC_LOCAL all replicas/target replicas may be in different order, so - // we might have to find a replacement that's not already in targetReplicas. - if (repairDecision == ReadRepairDecision.DC_LOCAL && targetReplicas.contains(extraReplica)) - { - for (InetAddressAndPort address : allReplicas) - { - if (!targetReplicas.contains(address)) - { - extraReplica = address; - break; - } - } - } - targetReplicas.add(extraReplica); ++ targetReplicas.add(allReplicas.get(targetReplicas.size())); + + if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE)) + return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + else // PERCENTILE or CUSTOM. + return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + } + + /** + * Returns true if speculation should occur and if it should then block until it is time to + * send the speculative reads + */ + boolean shouldSpeculateAndMaybeWait() + { + // no latency information, or we're overloaded + if (cfs.sampleLatencyNanos > TimeUnit.MILLISECONDS.toNanos(command.getTimeout())) + return false; + + return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS); + } + + void onReadTimeout() {} + + public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor + { + /** + * If never speculating due to lack of replicas + * log it is as a failure if it should have happened + * but couldn't due to lack of replicas + */ + private final boolean logFailedSpeculation; + + public NeverSpeculatingReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean logFailedSpeculation) + { + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + this.logFailedSpeculation = logFailedSpeculation; + } + + public void executeAsync() + { + makeDataRequests(targetReplicas.subList(0, 1)); + if (targetReplicas.size() > 1) + makeDigestRequests(targetReplicas.subList(1, targetReplicas.size())); + } + + public void maybeTryAdditionalReplicas() + { + if (shouldSpeculateAndMaybeWait() && logFailedSpeculation) + { + cfs.metric.speculativeInsufficientReplicas.inc(); + } + } + + public List<InetAddressAndPort> getContactedReplicas() + { + return targetReplicas; + } + } + + static class SpeculatingReadExecutor extends AbstractReadExecutor + { + private volatile boolean speculated = false; + + public SpeculatingReadExecutor(Keyspace keyspace, + ColumnFamilyStore cfs, + ReadCommand command, + ConsistencyLevel consistencyLevel, + List<InetAddressAndPort> targetReplicas, + long queryStartNanoTime) + { + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + } + + public void executeAsync() + { + // if CL + RR result in covering all replicas, getReadExecutor forces AlwaysSpeculating. So we know + // that the last replica in our list is "extra." + List<InetAddressAndPort> initialReplicas = targetReplicas.subList(0, targetReplicas.size() - 1); + + if (handler.blockfor < initialReplicas.size()) + { + // We're hitting additional targets for read repair. Since our "extra" replica is the least- + // preferred by the snitch, we do an extra data read to start with against a replica more + // likely to reply; better to let RR fail than the entire query. + makeDataRequests(initialReplicas.subList(0, 2)); + if (initialReplicas.size() > 2) + makeDigestRequests(initialReplicas.subList(2, initialReplicas.size())); + } + else + { + // not doing read repair; all replies are important, so it doesn't matter which nodes we + // perform data reads against vs digest. + makeDataRequests(initialReplicas.subList(0, 1)); + if (initialReplicas.size() > 1) + makeDigestRequests(initialReplicas.subList(1, initialReplicas.size())); + } + } + + public void maybeTryAdditionalReplicas() + { + if (shouldSpeculateAndMaybeWait()) + { + //Handle speculation stats first in case the callback fires immediately + speculated = true; + cfs.metric.speculativeRetries.inc(); + // Could be waiting on the data, or on enough digests. + ReadCommand retryCommand = command; + if (handler.resolver.isDataPresent()) + retryCommand = command.copyAsDigestQuery(); + + InetAddressAndPort extraReplica = Iterables.getLast(targetReplicas); + if (traceState != null) + traceState.trace("speculating read retry on {}", extraReplica); + logger.trace("speculating read retry on {}", extraReplica); + MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), extraReplica, handler); + } + } + + public List<InetAddressAndPort> getContactedReplicas() + { + return speculated + ? targetReplicas + : targetReplicas.subList(0, targetReplicas.size() - 1); + } + + @Override + void onReadTimeout() + { + //Shouldn't be possible to get here without first attempting to speculate even if the + //timing is bad + assert speculated; + cfs.metric.speculativeFailedRetries.inc(); + } + } + + private static class AlwaysSpeculatingReadExecutor extends AbstractReadExecutor + { + public AlwaysSpeculatingReadExecutor(Keyspace keyspace, + ColumnFamilyStore cfs, + ReadCommand command, + ConsistencyLevel consistencyLevel, + List<InetAddressAndPort> targetReplicas, + long queryStartNanoTime) + { + super(keyspace, cfs, command, consistencyLevel, targetReplicas, queryStartNanoTime); + } + + public void maybeTryAdditionalReplicas() + { + // no-op + } + + public List<InetAddressAndPort> getContactedReplicas() + { + return targetReplicas; + } + + @Override + public void executeAsync() + { + makeDataRequests(targetReplicas.subList(0, targetReplicas.size() > 1 ? 2 : 1)); + if (targetReplicas.size() > 2) + makeDigestRequests(targetReplicas.subList(2, targetReplicas.size())); + cfs.metric.speculativeRetries.inc(); + } + + @Override + void onReadTimeout() + { + cfs.metric.speculativeFailedRetries.inc(); + } + } + + public void setResult(PartitionIterator result) + { + Preconditions.checkState(this.result == null, "Result can only be set once"); + this.result = result; + } + + /** + * Wait for the CL to be satisfied by responses + */ + public void awaitResponses() throws ReadTimeoutException + { + try + { + handler.awaitResults(); + } + catch (ReadTimeoutException e) + { + try + { + onReadTimeout(); + } + finally + { + throw e; + } + } + + // return immediately, or begin a read repair + if (digestResolver.responsesMatch()) + { + setResult(digestResolver.getData()); + } + else + { + Tracing.trace("Digest mismatch: Mismatch for key {}", getKey()); - readRepair.startForegroundRepair(digestResolver, handler.endpoints, getContactedReplicas(), this::setResult); ++ readRepair.startRepair(digestResolver, handler.endpoints, getContactedReplicas(), this::setResult); + } + } + + public void awaitReadRepair() throws ReadTimeoutException + { + try + { - readRepair.awaitForegroundRepairFinish(); ++ readRepair.awaitRepair(); + } + catch (ReadTimeoutException e) + { + if (Tracing.isTracing()) + Tracing.trace("Timed out waiting on digest mismatch repair requests"); + else + logger.trace("Timed out waiting on digest mismatch repair requests"); + // the caught exception here will have CL.ALL from the repair command, + // not whatever CL the initial command was at (CASSANDRA-7947) + int blockFor = consistency.blockFor(Keyspace.open(command.metadata().keyspace)); + throw new ReadTimeoutException(consistency, blockFor-1, blockFor, true); + } + } + + public void maybeRepairAdditionalReplicas() + { + // TODO: this + } + + public PartitionIterator getResult() throws ReadFailureException, ReadTimeoutException + { + Preconditions.checkState(result != null, "Result must be set first"); + return result; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
