http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index e88f037,164e32d..ca3c69f
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@@ -102,167 -104,170 +102,166 @@@ public final class SchemaKeyspac
       */
      private static final Set<String> TABLES_WITH_CDC_ADDED = 
ImmutableSet.of(TABLES, VIEWS);
  
 -
 -    /**
 -     * Until we upgrade the messaging service version, that is version 4.0, 
we must preserve the old order (before CASSANDRA-12213)
 -     * for digest calculations, otherwise the nodes will never agree on the 
schema during a rolling upgrade, see CASSANDRA-13559.
 -     */
 -    public static final ImmutableList<String> ALL_FOR_DIGEST =
 -        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, VIEWS, TYPES, 
FUNCTIONS, AGGREGATES, INDEXES);
 -
 -    private static final CFMetaData Keyspaces =
 -        compile(KEYSPACES,
 -                "keyspace definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "durable_writes boolean,"
 -                + "replication frozen<map<text, text>>,"
 -                + "PRIMARY KEY ((keyspace_name)))");
 -
 -    private static final CFMetaData Tables =
 -        compile(TABLES,
 -                "table definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "table_name text,"
 -                + "bloom_filter_fp_chance double,"
 -                + "caching frozen<map<text, text>>,"
 -                + "comment text,"
 -                + "compaction frozen<map<text, text>>,"
 -                + "compression frozen<map<text, text>>,"
 -                + "crc_check_chance double,"
 -                + "dclocal_read_repair_chance double,"
 -                + "default_time_to_live int,"
 -                + "extensions frozen<map<text, blob>>,"
 -                + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, 
COMPOUND
 -                + "gc_grace_seconds int,"
 -                + "id uuid,"
 -                + "max_index_interval int,"
 -                + "memtable_flush_period_in_ms int,"
 -                + "min_index_interval int,"
 -                + "read_repair_chance double,"
 -                + "speculative_retry text,"
 -                + "cdc boolean,"
 -                + "PRIMARY KEY ((keyspace_name), table_name))");
 -
 -    private static final CFMetaData Columns =
 -        compile(COLUMNS,
 -                "column definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "table_name text,"
 -                + "column_name text,"
 -                + "clustering_order text,"
 -                + "column_name_bytes blob,"
 -                + "kind text,"
 -                + "position int,"
 -                + "type text,"
 -                + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 -
 -    private static final CFMetaData DroppedColumns =
 -        compile(DROPPED_COLUMNS,
 -                "dropped column registry",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "table_name text,"
 -                + "column_name text,"
 -                + "dropped_time timestamp,"
 -                + "type text,"
 -                + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 -
 -    private static final CFMetaData Triggers =
 -        compile(TRIGGERS,
 -                "trigger definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "table_name text,"
 -                + "trigger_name text,"
 -                + "options frozen<map<text, text>>,"
 -                + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 -
 -    private static final CFMetaData Views =
 -        compile(VIEWS,
 -                "view definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "view_name text,"
 -                + "base_table_id uuid,"
 -                + "base_table_name text,"
 -                + "where_clause text,"
 -                + "bloom_filter_fp_chance double,"
 -                + "caching frozen<map<text, text>>,"
 -                + "comment text,"
 -                + "compaction frozen<map<text, text>>,"
 -                + "compression frozen<map<text, text>>,"
 -                + "crc_check_chance double,"
 -                + "dclocal_read_repair_chance double,"
 -                + "default_time_to_live int,"
 -                + "extensions frozen<map<text, blob>>,"
 -                + "gc_grace_seconds int,"
 -                + "id uuid,"
 -                + "include_all_columns boolean,"
 -                + "max_index_interval int,"
 -                + "memtable_flush_period_in_ms int,"
 -                + "min_index_interval int,"
 -                + "read_repair_chance double,"
 -                + "speculative_retry text,"
 -                + "cdc boolean,"
 -                + "PRIMARY KEY ((keyspace_name), view_name))");
 -
 -    private static final CFMetaData Indexes =
 -        compile(INDEXES,
 -                "secondary index definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "table_name text,"
 -                + "index_name text,"
 -                + "kind text,"
 -                + "options frozen<map<text, text>>,"
 -                + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
 -
 -    private static final CFMetaData Types =
 -        compile(TYPES,
 -                "user defined type definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "type_name text,"
 -                + "field_names frozen<list<text>>,"
 -                + "field_types frozen<list<text>>,"
 -                + "PRIMARY KEY ((keyspace_name), type_name))");
 -
 -    private static final CFMetaData Functions =
 -        compile(FUNCTIONS,
 -                "user defined function definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "function_name text,"
 -                + "argument_types frozen<list<text>>,"
 -                + "argument_names frozen<list<text>>,"
 -                + "body text,"
 -                + "language text,"
 -                + "return_type text,"
 -                + "called_on_null_input boolean,"
 -                + "PRIMARY KEY ((keyspace_name), function_name, 
argument_types))");
 -
 -    private static final CFMetaData Aggregates =
 -        compile(AGGREGATES,
 -                "user defined aggregate definitions",
 -                "CREATE TABLE %s ("
 -                + "keyspace_name text,"
 -                + "aggregate_name text,"
 -                + "argument_types frozen<list<text>>,"
 -                + "final_func text,"
 -                + "initcond text,"
 -                + "return_type text,"
 -                + "state_func text,"
 -                + "state_type text,"
 -                + "PRIMARY KEY ((keyspace_name), aggregate_name, 
argument_types))");
 -
 -    public static final List<CFMetaData> ALL_TABLE_METADATA =
 +    private static final TableMetadata Keyspaces =
 +        parse(KEYSPACES,
 +              "keyspace definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "durable_writes boolean,"
 +              + "replication frozen<map<text, text>>,"
 +              + "PRIMARY KEY ((keyspace_name)))");
 +
 +    private static final TableMetadata Tables =
 +        parse(TABLES,
 +              "table definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "table_name text,"
 +              + "bloom_filter_fp_chance double,"
 +              + "caching frozen<map<text, text>>,"
 +              + "comment text,"
 +              + "compaction frozen<map<text, text>>,"
 +              + "compression frozen<map<text, text>>,"
 +              + "crc_check_chance double,"
-               + "dclocal_read_repair_chance double,"
++              + "dclocal_read_repair_chance double," // no longer used, left 
for drivers' sake
 +              + "default_time_to_live int,"
 +              + "extensions frozen<map<text, blob>>,"
 +              + "flags frozen<set<text>>," // SUPER, COUNTER, DENSE, COMPOUND
 +              + "gc_grace_seconds int,"
 +              + "id uuid,"
 +              + "max_index_interval int,"
 +              + "memtable_flush_period_in_ms int,"
 +              + "min_index_interval int,"
-               + "read_repair_chance double,"
++              + "read_repair_chance double," // no longer used, left for 
drivers' sake
 +              + "speculative_retry text,"
 +              + "cdc boolean,"
 +              + "PRIMARY KEY ((keyspace_name), table_name))");
 +
 +    private static final TableMetadata Columns =
 +        parse(COLUMNS,
 +              "column definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "table_name text,"
 +              + "column_name text,"
 +              + "clustering_order text,"
 +              + "column_name_bytes blob,"
 +              + "kind text,"
 +              + "position int,"
 +              + "type text,"
 +              + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 +
 +    private static final TableMetadata DroppedColumns =
 +        parse(DROPPED_COLUMNS,
 +              "dropped column registry",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "table_name text,"
 +              + "column_name text,"
 +              + "dropped_time timestamp,"
 +              + "type text,"
 +              + "kind text,"
 +              + "PRIMARY KEY ((keyspace_name), table_name, column_name))");
 +
 +    private static final TableMetadata Triggers =
 +        parse(TRIGGERS,
 +              "trigger definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "table_name text,"
 +              + "trigger_name text,"
 +              + "options frozen<map<text, text>>,"
 +              + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 +
 +    private static final TableMetadata Views =
 +        parse(VIEWS,
 +              "view definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "view_name text,"
 +              + "base_table_id uuid,"
 +              + "base_table_name text,"
 +              + "where_clause text,"
 +              + "bloom_filter_fp_chance double,"
 +              + "caching frozen<map<text, text>>,"
 +              + "comment text,"
 +              + "compaction frozen<map<text, text>>,"
 +              + "compression frozen<map<text, text>>,"
 +              + "crc_check_chance double,"
-               + "dclocal_read_repair_chance double,"
++              + "dclocal_read_repair_chance double," // no longer used, left 
for drivers' sake
 +              + "default_time_to_live int,"
 +              + "extensions frozen<map<text, blob>>,"
 +              + "gc_grace_seconds int,"
 +              + "id uuid,"
 +              + "include_all_columns boolean,"
 +              + "max_index_interval int,"
 +              + "memtable_flush_period_in_ms int,"
 +              + "min_index_interval int,"
-               + "read_repair_chance double,"
++              + "read_repair_chance double," // no longer used, left for 
drivers' sake
 +              + "speculative_retry text,"
 +              + "cdc boolean,"
 +              + "PRIMARY KEY ((keyspace_name), view_name))");
 +
 +    private static final TableMetadata Indexes =
 +        parse(INDEXES,
 +              "secondary index definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "table_name text,"
 +              + "index_name text,"
 +              + "kind text,"
 +              + "options frozen<map<text, text>>,"
 +              + "PRIMARY KEY ((keyspace_name), table_name, index_name))");
 +
 +    private static final TableMetadata Types =
 +        parse(TYPES,
 +              "user defined type definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "type_name text,"
 +              + "field_names frozen<list<text>>,"
 +              + "field_types frozen<list<text>>,"
 +              + "PRIMARY KEY ((keyspace_name), type_name))");
 +
 +    private static final TableMetadata Functions =
 +        parse(FUNCTIONS,
 +              "user defined function definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "function_name text,"
 +              + "argument_types frozen<list<text>>,"
 +              + "argument_names frozen<list<text>>,"
 +              + "body text,"
 +              + "language text,"
 +              + "return_type text,"
 +              + "called_on_null_input boolean,"
 +              + "PRIMARY KEY ((keyspace_name), function_name, 
argument_types))");
 +
 +    private static final TableMetadata Aggregates =
 +        parse(AGGREGATES,
 +              "user defined aggregate definitions",
 +              "CREATE TABLE %s ("
 +              + "keyspace_name text,"
 +              + "aggregate_name text,"
 +              + "argument_types frozen<list<text>>,"
 +              + "final_func text,"
 +              + "initcond text,"
 +              + "return_type text,"
 +              + "state_func text,"
 +              + "state_type text,"
 +              + "PRIMARY KEY ((keyspace_name), aggregate_name, 
argument_types))");
 +
 +    private static final List<TableMetadata> ALL_TABLE_METADATA =
          ImmutableList.of(Keyspaces, Tables, Columns, Triggers, 
DroppedColumns, Views, Types, Functions, Aggregates, Indexes);
  
 -    private static CFMetaData compile(String name, String description, String 
schema)
 +    private static TableMetadata parse(String name, String description, 
String cql)
      {
 -        return CFMetaData.compile(String.format(schema, name), 
SchemaConstants.SCHEMA_KEYSPACE_NAME)
 -                         .comment(description)
 -                         .gcGraceSeconds((int) TimeUnit.DAYS.toSeconds(7));
 +        return CreateTableStatement.parse(format(cql, name), 
SchemaConstants.SCHEMA_KEYSPACE_NAME)
 +                                   
.id(TableId.forSystemTable(SchemaConstants.SCHEMA_KEYSPACE_NAME, name))
-                                    .dcLocalReadRepairChance(0.0)
 +                                   .gcGraceSeconds((int) 
TimeUnit.DAYS.toSeconds(7))
 +                                   .memtableFlushPeriod((int) 
TimeUnit.HOURS.toMillis(1))
 +                                   .comment(description)
 +                                   .build();
      }
  
      public static KeyspaceMetadata metadata()
@@@ -524,13 -553,13 +523,13 @@@
      {
          builder.add("bloom_filter_fp_chance", params.bloomFilterFpChance)
                 .add("comment", params.comment)
--               .add("dclocal_read_repair_chance", 
params.dcLocalReadRepairChance)
++               .add("dclocal_read_repair_chance", 0.0) // no longer used, 
left for drivers' sake
                 .add("default_time_to_live", params.defaultTimeToLive)
                 .add("gc_grace_seconds", params.gcGraceSeconds)
                 .add("max_index_interval", params.maxIndexInterval)
                 .add("memtable_flush_period_in_ms", 
params.memtableFlushPeriodInMs)
                 .add("min_index_interval", params.minIndexInterval)
--               .add("read_repair_chance", params.readRepairChance)
++               .add("read_repair_chance", 0.0) // no longer used, left for 
drivers' sake
                 .add("speculative_retry", params.speculativeRetry.toString())
                 .add("crc_check_chance", params.crcCheckChance)
                 .add("caching", params.caching.asMap())
@@@ -994,17 -1070,17 +993,15 @@@
                            .comment(row.getString("comment"))
                            
.compaction(CompactionParams.fromMap(row.getFrozenTextMap("compaction")))
                            
.compression(CompressionParams.fromMap(row.getFrozenTextMap("compression")))
--                          
.dcLocalReadRepairChance(row.getDouble("dclocal_read_repair_chance"))
                            
.defaultTimeToLive(row.getInt("default_time_to_live"))
                            .extensions(row.getFrozenMap("extensions", 
UTF8Type.instance, BytesType.instance))
                            .gcGraceSeconds(row.getInt("gc_grace_seconds"))
                            .maxIndexInterval(row.getInt("max_index_interval"))
                            
.memtableFlushPeriodInMs(row.getInt("memtable_flush_period_in_ms"))
                            .minIndexInterval(row.getInt("min_index_interval"))
--                          
.readRepairChance(row.getDouble("read_repair_chance"))
                            .crcCheckChance(row.getDouble("crc_check_chance"))
 -                          
.speculativeRetry(SpeculativeRetryParam.fromString(row.getString("speculative_retry")))
 -                          .cdc(row.has("cdc") ? row.getBoolean("cdc") : false)
 +                          
.speculativeRetry(SpeculativeRetryPolicy.fromString(row.getString("speculative_retry")))
 +                          .cdc(row.has("cdc") && row.getBoolean("cdc"))
                            .build();
      }
  

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/TableMetadata.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/TableMetadata.java
index 3ccc3c5,0000000..4634438
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/schema/TableMetadata.java
+++ b/src/java/org/apache/cassandra/schema/TableMetadata.java
@@@ -1,1000 -1,0 +1,984 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.schema;
 +
 +import java.nio.ByteBuffer;
 +import java.util.*;
 +import java.util.Objects;
 +
 +import com.google.common.base.MoreObjects;
 +import com.google.common.collect.*;
 +
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.auth.DataResource;
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.cql3.ColumnIdentifier;
 +import org.apache.cassandra.db.*;
 +import org.apache.cassandra.db.marshal.*;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.exceptions.ConfigurationException;
 +import org.apache.cassandra.service.reads.SpeculativeRetryPolicy;
 +import org.apache.cassandra.utils.AbstractIterator;
 +import org.github.jamm.Unmetered;
 +
 +import static java.lang.String.format;
 +import static java.util.stream.Collectors.toList;
 +import static java.util.stream.Collectors.toSet;
 +
 +import static com.google.common.collect.Iterables.transform;
 +import static org.apache.cassandra.schema.IndexMetadata.isNameValid;
 +
 +@Unmetered
 +public final class TableMetadata
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(TableMetadata.class);
 +    private static final ImmutableSet<Flag> DEFAULT_CQL_FLAGS = 
ImmutableSet.of(Flag.COMPOUND);
 +    private static final ImmutableSet<Flag> DEPRECATED_CS_FLAGS = 
ImmutableSet.of(Flag.DENSE, Flag.SUPER);
 +
 +    public static final String COMPACT_STORAGE_HALT_MESSAGE =
 +             "Compact Tables are not allowed in Cassandra starting with 4.0 
version. " +
 +             "Use `ALTER ... DROP COMPACT STORAGE` command supplied in 
3.x/3.11 Cassandra " +
 +             "in order to migrate off Compact Storage.";
 +
 +    private static final String COMPACT_STORAGE_DEPRECATION_MESSAGE =
 +             "Incorrect set of flags is was detected in table {}.{}: '{}'. 
\n" +
 +             "Starting with version 4.0, '{}' flags are deprecated and every 
table has to have COMPOUND flag. \n" +
 +             "Forcing the following set of flags: '{}'";
 +
 +    public enum Flag
 +    {
 +        SUPER, COUNTER, DENSE, COMPOUND;
 +
 +        public static boolean isCQLCompatible(Set<Flag> flags)
 +        {
 +            return !flags.contains(Flag.DENSE) && !flags.contains(Flag.SUPER) 
&& flags.contains(Flag.COMPOUND);
 +        }
 +
 +        public static Set<Flag> fromStringSet(Set<String> strings)
 +        {
 +            return 
strings.stream().map(String::toUpperCase).map(Flag::valueOf).collect(toSet());
 +        }
 +
 +        public static Set<String> toStringSet(Set<Flag> flags)
 +        {
 +            return 
flags.stream().map(Flag::toString).map(String::toLowerCase).collect(toSet());
 +        }
 +    }
 +
 +    public final String keyspace;
 +    public final String name;
 +    public final TableId id;
 +
 +    public final IPartitioner partitioner;
 +    public final TableParams params;
 +    public final ImmutableSet<Flag> flags;
 +
 +    private final boolean isView;
 +    private final String indexName; // derived from table name
 +
 +    /*
 +     * All CQL3 columns definition are stored in the columns map.
 +     * On top of that, we keep separated collection of each kind of 
definition, to
 +     * 1) allow easy access to each kind and
 +     * 2) for the partition key and clustering key ones, those list are 
ordered by the "component index" of the elements.
 +     */
 +    public final ImmutableMap<ByteBuffer, DroppedColumn> droppedColumns;
 +    final ImmutableMap<ByteBuffer, ColumnMetadata> columns;
 +
 +    private final ImmutableList<ColumnMetadata> partitionKeyColumns;
 +    private final ImmutableList<ColumnMetadata> clusteringColumns;
 +    private final RegularAndStaticColumns regularAndStaticColumns;
 +
 +    public final Indexes indexes;
 +    public final Triggers triggers;
 +
 +    // derived automatically from flags and columns
 +    public final AbstractType<?> partitionKeyType;
 +    public final ClusteringComparator comparator;
 +
 +    /*
 +     * For dense tables, this alias the single non-PK column the table 
contains (since it can only have one). We keep
 +     * that as convenience to access that column more easily (but we could 
replace calls by regularAndStaticColumns().iterator().next()
 +     * for those tables in practice).
 +     */
 +    public final ColumnMetadata compactValueColumn;
 +
 +    // performance hacks; TODO see if all are really necessary
 +    public final DataResource resource;
 +
 +    private TableMetadata(Builder builder)
 +    {
 +        if (!Flag.isCQLCompatible(builder.flags))
 +        {
 +            flags = 
ImmutableSet.copyOf(Sets.union(Sets.difference(builder.flags, 
DEPRECATED_CS_FLAGS), DEFAULT_CQL_FLAGS));
 +            logger.warn(COMPACT_STORAGE_DEPRECATION_MESSAGE, 
builder.keyspace, builder.name,  builder.flags, DEPRECATED_CS_FLAGS, flags);
 +        }
 +        else
 +        {
 +            flags = Sets.immutableEnumSet(builder.flags);
 +        }
 +        keyspace = builder.keyspace;
 +        name = builder.name;
 +        id = builder.id;
 +
 +        partitioner = builder.partitioner;
 +        params = builder.params.build();
 +        isView = builder.isView;
 +
 +        indexName = name.contains(".")
 +                  ? name.substring(name.indexOf('.') + 1)
 +                  : null;
 +
 +        droppedColumns = ImmutableMap.copyOf(builder.droppedColumns);
 +        Collections.sort(builder.partitionKeyColumns);
 +        partitionKeyColumns = 
ImmutableList.copyOf(builder.partitionKeyColumns);
 +        Collections.sort(builder.clusteringColumns);
 +        clusteringColumns = ImmutableList.copyOf(builder.clusteringColumns);
 +        regularAndStaticColumns = 
RegularAndStaticColumns.builder().addAll(builder.regularAndStaticColumns).build();
 +        columns = ImmutableMap.copyOf(builder.columns);
 +
 +        indexes = builder.indexes;
 +        triggers = builder.triggers;
 +
 +        partitionKeyType = partitionKeyColumns.size() == 1
 +                         ? partitionKeyColumns.get(0).type
 +                         : 
CompositeType.getInstance(transform(partitionKeyColumns, t -> t.type));
 +
 +        comparator = new ClusteringComparator(transform(clusteringColumns, c 
-> c.type));
 +
 +        compactValueColumn = isCompactTable()
 +                           ? 
CompactTables.getCompactValueColumn(regularAndStaticColumns, isSuper())
 +                           : null;
 +
 +        resource = DataResource.table(keyspace, name);
 +    }
 +
 +    public static Builder builder(String keyspace, String table)
 +    {
 +        return new Builder(keyspace, table);
 +    }
 +
 +    public static Builder builder(String keyspace, String table, TableId id)
 +    {
 +        return new Builder(keyspace, table, id);
 +    }
 +
 +    public Builder unbuild()
 +    {
 +        return builder(keyspace, name, id)
 +               .partitioner(partitioner)
 +               .params(params)
 +               .flags(flags)
 +               .isView(isView)
 +               .addColumns(columns())
 +               .droppedColumns(droppedColumns)
 +               .indexes(indexes)
 +               .triggers(triggers);
 +    }
 +
 +    public boolean isView()
 +    {
 +        return isView;
 +    }
 +
 +    public boolean isIndex()
 +    {
 +        return indexName != null;
 +    }
 +
 +    public Optional<String> indexName()
 +    {
 +        return Optional.ofNullable(indexName);
 +    }
 +
 +    /*
 +     *  We call dense a CF for which each component of the comparator is a 
clustering column, i.e. no
 +     * component is used to store a regular column names. In other words, 
non-composite static "thrift"
 +     * and CQL3 CF are *not* dense.
 +     */
 +    public boolean isDense()
 +    {
 +        return flags.contains(Flag.DENSE);
 +    }
 +
 +    public boolean isCompound()
 +    {
 +        return flags.contains(Flag.COMPOUND);
 +    }
 +
 +    public boolean isSuper()
 +    {
 +        return flags.contains(Flag.SUPER);
 +    }
 +
 +    public boolean isCounter()
 +    {
 +        return flags.contains(Flag.COUNTER);
 +    }
 +
 +    public boolean isCQLTable()
 +    {
 +        return !isSuper() && !isDense() && isCompound();
 +    }
 +
 +    public boolean isCompactTable()
 +    {
 +        return !isCQLTable();
 +    }
 +
 +    public boolean isStaticCompactTable()
 +    {
 +        return !isSuper() && !isDense() && !isCompound();
 +    }
 +
 +    public ImmutableCollection<ColumnMetadata> columns()
 +    {
 +        return columns.values();
 +    }
 +
 +    public Iterable<ColumnMetadata> primaryKeyColumns()
 +    {
 +        return Iterables.concat(partitionKeyColumns, clusteringColumns);
 +    }
 +
 +    public ImmutableList<ColumnMetadata> partitionKeyColumns()
 +    {
 +        return partitionKeyColumns;
 +    }
 +
 +    public ImmutableList<ColumnMetadata> clusteringColumns()
 +    {
 +        return clusteringColumns;
 +    }
 +
 +    public RegularAndStaticColumns regularAndStaticColumns()
 +    {
 +        return regularAndStaticColumns;
 +    }
 +
 +    public Columns regularColumns()
 +    {
 +        return regularAndStaticColumns.regulars;
 +    }
 +
 +    public Columns staticColumns()
 +    {
 +        return regularAndStaticColumns.statics;
 +    }
 +
 +    /*
 +     * An iterator over all column definitions but that respect the order of 
a SELECT *.
 +     * This also "hide" the clustering/regular columns for a non-CQL3 
non-dense table for backward compatibility
 +     * sake.
 +     */
 +    public Iterator<ColumnMetadata> allColumnsInSelectOrder()
 +    {
 +        final boolean isStaticCompactTable = isStaticCompactTable();
 +        final boolean noNonPkColumns = isCompactTable() && 
CompactTables.hasEmptyCompactValue(this);
 +
 +        return new AbstractIterator<ColumnMetadata>()
 +        {
 +            private final Iterator<ColumnMetadata> partitionKeyIter = 
partitionKeyColumns.iterator();
 +            private final Iterator<ColumnMetadata> clusteringIter =
 +                isStaticCompactTable ? Collections.emptyIterator() : 
clusteringColumns.iterator();
 +            private final Iterator<ColumnMetadata> otherColumns =
 +                noNonPkColumns
 +              ? Collections.emptyIterator()
 +              : (isStaticCompactTable ? staticColumns().selectOrderIterator()
 +                                      : 
regularAndStaticColumns.selectOrderIterator());
 +
 +            protected ColumnMetadata computeNext()
 +            {
 +                if (partitionKeyIter.hasNext())
 +                    return partitionKeyIter.next();
 +
 +                if (clusteringIter.hasNext())
 +                    return clusteringIter.next();
 +
 +                return otherColumns.hasNext() ? otherColumns.next() : 
endOfData();
 +            }
 +        };
 +    }
 +
 +    /**
 +     * Returns the ColumnMetadata for {@code name}.
 +     */
 +    public ColumnMetadata getColumn(ColumnIdentifier name)
 +    {
 +        return columns.get(name.bytes);
 +    }
 +
 +    /*
 +     * In general it is preferable to work with ColumnIdentifier to make it
 +     * clear that we are talking about a CQL column, not a cell name, but 
there
 +     * is a few cases where all we have is a ByteBuffer (when dealing with 
IndexExpression
 +     * for instance) so...
 +     */
 +    public ColumnMetadata getColumn(ByteBuffer name)
 +    {
 +        return columns.get(name);
 +    }
 +
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        return dropped == null ? null : dropped.column;
 +    }
 +
 +    /**
 +     * Returns a "fake" ColumnMetadata corresponding to the dropped column 
{@code name}
 +     * of {@code null} if there is no such dropped column.
 +     *
 +     * @param name - the column name
 +     * @param isStatic - whether the column was a static column, if known
 +     */
 +    public ColumnMetadata getDroppedColumn(ByteBuffer name, boolean isStatic)
 +    {
 +        DroppedColumn dropped = droppedColumns.get(name);
 +        if (dropped == null)
 +            return null;
 +
 +        if (isStatic && !dropped.column.isStatic())
 +            return ColumnMetadata.staticColumn(this, name, 
dropped.column.type);
 +
 +        return dropped.column;
 +    }
 +
 +    public boolean hasStaticColumns()
 +    {
 +        return !staticColumns().isEmpty();
 +    }
 +
 +    public void validate()
 +    {
 +        if (!isNameValid(keyspace))
 +            except("Keyspace name must not be empty, more than %s characters 
long, or contain non-alphanumeric-underscore characters (got \"%s\")", 
SchemaConstants.NAME_LENGTH, keyspace);
 +
 +        if (!isNameValid(name))
 +            except("Table name must not be empty, more than %s characters 
long, or contain non-alphanumeric-underscore characters (got \"%s\")", 
SchemaConstants.NAME_LENGTH, name);
 +
 +        params.validate();
 +
 +        if (partitionKeyColumns.stream().anyMatch(c -> c.type.isCounter()))
 +            except("PRIMARY KEY columns cannot contain counters");
 +
 +        // Mixing counter with non counter columns is not supported (#2614)
 +        if (isCounter())
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (!(column.type.isCounter()) && 
!CompactTables.isSuperColumnMapColumn(column))
 +                    except("Cannot have a non counter column (\"%s\") in a 
counter table", column.name);
 +        }
 +        else
 +        {
 +            for (ColumnMetadata column : regularAndStaticColumns)
 +                if (column.type.isCounter())
 +                    except("Cannot have a counter column (\"%s\") in a non 
counter column table", column.name);
 +        }
 +
 +        // All tables should have a partition key
 +        if (partitionKeyColumns.isEmpty())
 +            except("Missing partition keys for table %s", toString());
 +
 +        // A compact table should always have a clustering
 +        if (isCompactTable() && clusteringColumns.isEmpty())
 +            except("For table %s, isDense=%b, isCompound=%b, clustering=%s", 
toString(), isDense(), isCompound(), clusteringColumns);
 +
 +        if (!indexes.isEmpty() && isSuper())
 +            except("Secondary indexes are not supported on super column 
families");
 +
 +        indexes.validate(this);
 +    }
 +
 +    void validateCompatibility(TableMetadata other)
 +    {
 +        if (isIndex())
 +            return;
 +
 +        if (!other.keyspace.equals(keyspace))
 +            except("Keyspace mismatch (found %s; expected %s)", 
other.keyspace, keyspace);
 +
 +        if (!other.name.equals(name))
 +            except("Table mismatch (found %s; expected %s)", other.name, 
name);
 +
 +        if (!other.id.equals(id))
 +            except("Table ID mismatch (found %s; expected %s)", other.id, id);
 +
 +        if (!other.flags.equals(flags))
 +            except("Table type mismatch (found %s; expected %s)", 
other.flags, flags);
 +
 +        if (other.partitionKeyColumns.size() != partitionKeyColumns.size())
 +            except("Partition keys of different length (found %s; expected 
%s)", other.partitionKeyColumns.size(), partitionKeyColumns.size());
 +
 +        for (int i = 0; i < partitionKeyColumns.size(); i++)
 +            if 
(!other.partitionKeyColumns.get(i).type.isCompatibleWith(partitionKeyColumns.get(i).type))
 +                except("Partition key column mismatch (found %s; expected 
%s)", other.partitionKeyColumns.get(i).type, partitionKeyColumns.get(i).type);
 +
 +        if (other.clusteringColumns.size() != clusteringColumns.size())
 +            except("Clustering columns of different length (found %s; 
expected %s)", other.clusteringColumns.size(), clusteringColumns.size());
 +
 +        for (int i = 0; i < clusteringColumns.size(); i++)
 +            if 
(!other.clusteringColumns.get(i).type.isCompatibleWith(clusteringColumns.get(i).type))
 +                except("Clustering column mismatch (found %s; expected %s)", 
other.clusteringColumns.get(i).type, clusteringColumns.get(i).type);
 +
 +        for (ColumnMetadata otherColumn : other.regularAndStaticColumns)
 +        {
 +            ColumnMetadata column = getColumn(otherColumn.name);
 +            if (column != null && 
!otherColumn.type.isCompatibleWith(column.type))
 +                except("Column mismatch (found %s; expected %s", otherColumn, 
column);
 +        }
 +    }
 +
 +    public ClusteringComparator partitionKeyAsClusteringComparator()
 +    {
 +        return new ClusteringComparator(partitionKeyColumns.stream().map(c -> 
c.type).collect(toList()));
 +    }
 +
 +    /**
 +     * The type to use to compare column names in "static compact"
 +     * tables or superColum ones.
 +     * <p>
 +     * This exists because for historical reasons, "static compact" tables as
 +     * well as super column ones can have non-UTF8 column names.
 +     * <p>
 +     * This method should only be called for superColumn tables and "static
 +     * compact" ones. For any other table, all column names are UTF8.
 +     */
 +    public AbstractType<?> staticCompactOrSuperTableColumnNameType()
 +    {
 +        if (isSuper())
 +        {
 +            assert compactValueColumn != null && compactValueColumn.type 
instanceof MapType;
 +            return ((MapType) compactValueColumn.type).nameComparator();
 +        }
 +
 +        assert isStaticCompactTable();
 +        return clusteringColumns.get(0).type;
 +    }
 +
 +    public AbstractType<?> columnDefinitionNameComparator(ColumnMetadata.Kind 
kind)
 +    {
 +        return (isSuper() && kind == ColumnMetadata.Kind.REGULAR) || 
(isStaticCompactTable() && kind == ColumnMetadata.Kind.STATIC)
 +             ? staticCompactOrSuperTableColumnNameType()
 +             : UTF8Type.instance;
 +    }
 +
 +    /**
 +     * Generate a table name for an index corresponding to the given column.
 +     * This is NOT the same as the index's name! This is only used in sstable 
filenames and is not exposed to users.
 +     *
 +     * @param info A definition of the column with index
 +     *
 +     * @return name of the index table
 +     */
 +    public String indexTableName(IndexMetadata info)
 +    {
 +        // TODO simplify this when info.index_name is guaranteed to be set
 +        return name + Directories.SECONDARY_INDEX_NAME_SEPARATOR + info.name;
 +    }
 +
 +    /**
 +     * @return true if the change as made impacts queries/updates on the 
table,
 +     *         e.g. any columns or indexes were added, removed, or altered; 
otherwise, false is returned.
 +     *         Used to determine whether prepared statements against this 
table need to be re-prepared.
 +     */
 +    boolean changeAffectsPreparedStatements(TableMetadata updated)
 +    {
 +        return !partitionKeyColumns.equals(updated.partitionKeyColumns)
 +            || !clusteringColumns.equals(updated.clusteringColumns)
 +            || 
!regularAndStaticColumns.equals(updated.regularAndStaticColumns)
 +            || !indexes.equals(updated.indexes)
 +            || params.defaultTimeToLive != updated.params.defaultTimeToLive
 +            || params.gcGraceSeconds != updated.params.gcGraceSeconds;
 +    }
 +
 +    /**
 +     * There is a couple of places in the code where we need a TableMetadata 
object and don't have one readily available
 +     * and know that only the keyspace and name matter. This creates such 
"fake" metadata. Use only if you know what
 +     * you're doing.
 +     */
 +    public static TableMetadata minimal(String keyspace, String name)
 +    {
 +        return TableMetadata.builder(keyspace, name)
 +                            .addPartitionKeyColumn("key", BytesType.instance)
 +                            .build();
 +    }
 +
 +    public TableMetadata updateIndexTableMetadata(TableParams baseTableParams)
 +    {
-         TableParams.Builder builder =
-             baseTableParams.unbuild()
-                            .readRepairChance(0.0)
-                            .dcLocalReadRepairChance(0.0)
-                            .gcGraceSeconds(0);
++        TableParams.Builder builder = 
baseTableParams.unbuild().gcGraceSeconds(0);
 +
 +        // Depends on parent's cache setting, turn on its index table's cache.
 +        // Row caching is never enabled; see CASSANDRA-5732
 +        builder.caching(baseTableParams.caching.cacheKeys() ? 
CachingParams.CACHE_KEYS : CachingParams.CACHE_NOTHING);
 +
 +        return unbuild().params(builder.build()).build();
 +    }
 +
 +    private void except(String format, Object... args)
 +    {
 +        throw new ConfigurationException(keyspace + "." + name + ": " 
+format(format, args));
 +    }
 +
 +    @Override
 +    public boolean equals(Object o)
 +    {
 +        if (this == o)
 +            return true;
 +
 +        if (!(o instanceof TableMetadata))
 +            return false;
 +
 +        TableMetadata tm = (TableMetadata) o;
 +
 +        return keyspace.equals(tm.keyspace)
 +            && name.equals(tm.name)
 +            && id.equals(tm.id)
 +            && partitioner.equals(tm.partitioner)
 +            && params.equals(tm.params)
 +            && flags.equals(tm.flags)
 +            && isView == tm.isView
 +            && columns.equals(tm.columns)
 +            && droppedColumns.equals(tm.droppedColumns)
 +            && indexes.equals(tm.indexes)
 +            && triggers.equals(tm.triggers);
 +    }
 +
 +    @Override
 +    public int hashCode()
 +    {
 +        return Objects.hash(keyspace, name, id, partitioner, params, flags, 
isView, columns, droppedColumns, indexes, triggers);
 +    }
 +
 +    @Override
 +    public String toString()
 +    {
 +        return String.format("%s.%s", ColumnIdentifier.maybeQuote(keyspace), 
ColumnIdentifier.maybeQuote(name));
 +    }
 +
 +    public String toDebugString()
 +    {
 +        return MoreObjects.toStringHelper(this)
 +                          .add("keyspace", keyspace)
 +                          .add("table", name)
 +                          .add("id", id)
 +                          .add("partitioner", partitioner)
 +                          .add("params", params)
 +                          .add("flags", flags)
 +                          .add("isView", isView)
 +                          .add("columns", columns())
 +                          .add("droppedColumns", droppedColumns.values())
 +                          .add("indexes", indexes)
 +                          .add("triggers", triggers)
 +                          .toString();
 +    }
 +
 +    public static final class Builder
 +    {
 +        final String keyspace;
 +        final String name;
 +
 +        private TableId id;
 +
 +        private IPartitioner partitioner;
 +        private TableParams.Builder params = TableParams.builder();
 +
 +        // Setting compound as default as "normal" CQL tables are compound 
and that's what we want by default
 +        private Set<Flag> flags = EnumSet.of(Flag.COMPOUND);
 +        private Triggers triggers = Triggers.none();
 +        private Indexes indexes = Indexes.none();
 +
 +        private final Map<ByteBuffer, DroppedColumn> droppedColumns = new 
HashMap<>();
 +        private final Map<ByteBuffer, ColumnMetadata> columns = new 
HashMap<>();
 +        private final List<ColumnMetadata> partitionKeyColumns = new 
ArrayList<>();
 +        private final List<ColumnMetadata> clusteringColumns = new 
ArrayList<>();
 +        private final List<ColumnMetadata> regularAndStaticColumns = new 
ArrayList<>();
 +
 +        private boolean isView;
 +
 +        private Builder(String keyspace, String name, TableId id)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +            this.id = id;
 +        }
 +
 +        private Builder(String keyspace, String name)
 +        {
 +            this.keyspace = keyspace;
 +            this.name = name;
 +        }
 +
 +        public TableMetadata build()
 +        {
 +            if (partitioner == null)
 +                partitioner = DatabaseDescriptor.getPartitioner();
 +
 +            if (id == null)
 +                id = TableId.generate();
 +
 +            return new TableMetadata(this);
 +        }
 +
 +        public Builder id(TableId val)
 +        {
 +            id = val;
 +            return this;
 +        }
 +
 +        public Builder partitioner(IPartitioner val)
 +        {
 +            partitioner = val;
 +            return this;
 +        }
 +
 +        public Builder params(TableParams val)
 +        {
 +            params = val.unbuild();
 +            return this;
 +        }
 +
 +        public Builder bloomFilterFpChance(double val)
 +        {
 +            params.bloomFilterFpChance(val);
 +            return this;
 +        }
 +
 +        public Builder caching(CachingParams val)
 +        {
 +            params.caching(val);
 +            return this;
 +        }
 +
 +        public Builder comment(String val)
 +        {
 +            params.comment(val);
 +            return this;
 +        }
 +
 +        public Builder compaction(CompactionParams val)
 +        {
 +            params.compaction(val);
 +            return this;
 +        }
 +
 +        public Builder compression(CompressionParams val)
 +        {
 +            params.compression(val);
 +            return this;
 +        }
 +
-         public Builder dcLocalReadRepairChance(double val)
-         {
-             params.dcLocalReadRepairChance(val);
-             return this;
-         }
- 
 +        public Builder defaultTimeToLive(int val)
 +        {
 +            params.defaultTimeToLive(val);
 +            return this;
 +        }
 +
 +        public Builder gcGraceSeconds(int val)
 +        {
 +            params.gcGraceSeconds(val);
 +            return this;
 +        }
 +
 +        public Builder maxIndexInterval(int val)
 +        {
 +            params.maxIndexInterval(val);
 +            return this;
 +        }
 +
 +        public Builder memtableFlushPeriod(int val)
 +        {
 +            params.memtableFlushPeriodInMs(val);
 +            return this;
 +        }
 +
 +        public Builder minIndexInterval(int val)
 +        {
 +            params.minIndexInterval(val);
 +            return this;
 +        }
 +
-         public Builder readRepairChance(double val)
-         {
-             params.readRepairChance(val);
-             return this;
-         }
- 
 +        public Builder crcCheckChance(double val)
 +        {
 +            params.crcCheckChance(val);
 +            return this;
 +        }
 +
 +        public Builder speculativeRetry(SpeculativeRetryPolicy val)
 +        {
 +            params.speculativeRetry(val);
 +            return this;
 +        }
 +
 +        public Builder extensions(Map<String, ByteBuffer> val)
 +        {
 +            params.extensions(val);
 +            return this;
 +        }
 +
 +        public Builder isView(boolean val)
 +        {
 +            isView = val;
 +            return this;
 +        }
 +
 +        public Builder flags(Set<Flag> val)
 +        {
 +            flags = val;
 +            return this;
 +        }
 +
 +        public Builder isSuper(boolean val)
 +        {
 +            return flag(Flag.SUPER, val);
 +        }
 +
 +        public Builder isCounter(boolean val)
 +        {
 +            return flag(Flag.COUNTER, val);
 +        }
 +
 +        public Builder isDense(boolean val)
 +        {
 +            return flag(Flag.DENSE, val);
 +        }
 +
 +        public Builder isCompound(boolean val)
 +        {
 +            return flag(Flag.COMPOUND, val);
 +        }
 +
 +        private Builder flag(Flag flag, boolean set)
 +        {
 +            if (set) flags.add(flag); else flags.remove(flag);
 +            return this;
 +        }
 +
 +        public Builder triggers(Triggers val)
 +        {
 +            triggers = val;
 +            return this;
 +        }
 +
 +        public Builder indexes(Indexes val)
 +        {
 +            indexes = val;
 +            return this;
 +        }
 +
 +        public Builder addPartitionKeyColumn(String name, AbstractType type)
 +        {
 +            return addPartitionKeyColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addPartitionKeyColumn(ColumnIdentifier name, 
AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, partitionKeyColumns.size(), ColumnMetadata.Kind.PARTITION_KEY));
 +        }
 +
 +        public Builder addClusteringColumn(String name, AbstractType type)
 +        {
 +            return addClusteringColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addClusteringColumn(ColumnIdentifier name, 
AbstractType type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, clusteringColumns.size(), ColumnMetadata.Kind.CLUSTERING));
 +        }
 +
 +        public Builder addRegularColumn(String name, AbstractType type)
 +        {
 +            return addRegularColumn(ColumnIdentifier.getInterned(name, 
false), type);
 +        }
 +
 +        public Builder addRegularColumn(ColumnIdentifier name, AbstractType 
type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.REGULAR));
 +        }
 +
 +        public Builder addStaticColumn(String name, AbstractType type)
 +        {
 +            return addStaticColumn(ColumnIdentifier.getInterned(name, false), 
type);
 +        }
 +
 +        public Builder addStaticColumn(ColumnIdentifier name, AbstractType 
type)
 +        {
 +            return addColumn(new ColumnMetadata(keyspace, this.name, name, 
type, ColumnMetadata.NO_POSITION, ColumnMetadata.Kind.STATIC));
 +        }
 +
 +        public Builder addColumn(ColumnMetadata column)
 +        {
 +            if (columns.containsKey(column.name.bytes))
 +                throw new IllegalArgumentException();
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.add(column);
 +                    Collections.sort(partitionKeyColumns);
 +                    break;
 +                case CLUSTERING:
 +                    column.type.checkComparable();
 +                    clusteringColumns.add(column);
 +                    Collections.sort(clusteringColumns);
 +                    break;
 +                default:
 +                    regularAndStaticColumns.add(column);
 +            }
 +
 +            columns.put(column.name.bytes, column);
 +
 +            return this;
 +        }
 +
 +        public Builder addColumns(Iterable<ColumnMetadata> columns)
 +        {
 +            columns.forEach(this::addColumn);
 +            return this;
 +        }
 +
 +        public Builder droppedColumns(Map<ByteBuffer, DroppedColumn> 
droppedColumns)
 +        {
 +            this.droppedColumns.clear();
 +            this.droppedColumns.putAll(droppedColumns);
 +            return this;
 +        }
 +
 +        /**
 +         * Records a deprecated column for a system table.
 +         */
 +        public Builder recordDeprecatedSystemColumn(String name, 
AbstractType<?> type)
 +        {
 +            // As we play fast and loose with the removal timestamp, make 
sure this is misued for a non system table.
 +            assert SchemaConstants.isLocalSystemKeyspace(keyspace);
 +            recordColumnDrop(ColumnMetadata.regularColumn(keyspace, 
this.name, name, type), Long.MAX_VALUE);
 +            return this;
 +        }
 +
 +        public Builder recordColumnDrop(ColumnMetadata column, long 
timeMicros)
 +        {
 +            droppedColumns.put(column.name.bytes, new DroppedColumn(column, 
timeMicros));
 +            return this;
 +        }
 +
 +        public Iterable<ColumnMetadata> columns()
 +        {
 +            return columns.values();
 +        }
 +
 +        public Set<String> columnNames()
 +        {
 +            return columns.values().stream().map(c -> 
c.name.toString()).collect(toSet());
 +        }
 +
 +        public ColumnMetadata getColumn(ColumnIdentifier identifier)
 +        {
 +            return columns.get(identifier.bytes);
 +        }
 +
 +        public ColumnMetadata getColumn(ByteBuffer name)
 +        {
 +            return columns.get(name);
 +        }
 +
 +        public boolean hasRegularColumns()
 +        {
 +            return 
regularAndStaticColumns.stream().anyMatch(ColumnMetadata::isRegular);
 +        }
 +
 +        /*
 +         * The following methods all assume a Builder with valid set of 
partition key, clustering, regular and static columns.
 +         */
 +
 +        public Builder removeRegularOrStaticColumn(ColumnIdentifier 
identifier)
 +        {
 +            ColumnMetadata column = columns.get(identifier.bytes);
 +            if (column == null || column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            columns.remove(identifier.bytes);
 +            regularAndStaticColumns.remove(column);
 +
 +            return this;
 +        }
 +
 +        public Builder renamePrimaryKeyColumn(ColumnIdentifier from, 
ColumnIdentifier to)
 +        {
 +            if (columns.containsKey(to.bytes))
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata column = columns.get(from.bytes);
 +            if (column == null || !column.isPrimaryKeyColumn())
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewName(to);
 +            if (column.isPartitionKey())
 +                partitionKeyColumns.set(column.position(), newColumn);
 +            else
 +                clusteringColumns.set(column.position(), newColumn);
 +
 +            columns.remove(from.bytes);
 +            columns.put(to.bytes, newColumn);
 +
 +            return this;
 +        }
 +
 +        public Builder alterColumnType(ColumnIdentifier name, AbstractType<?> 
type)
 +        {
 +            ColumnMetadata column = columns.get(name.bytes);
 +            if (column == null)
 +                throw new IllegalArgumentException();
 +
 +            ColumnMetadata newColumn = column.withNewType(type);
 +
 +            switch (column.kind)
 +            {
 +                case PARTITION_KEY:
 +                    partitionKeyColumns.set(column.position(), newColumn);
 +                    break;
 +                case CLUSTERING:
 +                    clusteringColumns.set(column.position(), newColumn);
 +                    break;
 +                case REGULAR:
 +                case STATIC:
 +                    regularAndStaticColumns.remove(column);
 +                    regularAndStaticColumns.add(newColumn);
 +                    break;
 +            }
 +
 +            columns.put(column.name.bytes, newColumn);
 +
 +            return this;
 +        }
 +    }
 +    
 +    /**
 +     * A table with strict liveness filters/ignores rows without PK liveness 
info,
 +     * effectively tying the row liveness to its primary key liveness.
 +     *
 +     * Currently this is only used by views with normal base column as PK 
column
 +     * so updates to other columns do not make the row live when the base 
column
 +     * is not live. See CASSANDRA-11500.
 +     */
 +    public boolean enforceStrictLiveness()
 +    {
 +        return isView && 
Keyspace.open(keyspace).viewManager.getByName(name).enforceStrictLiveness();
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/schema/TableParams.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/schema/TableParams.java
index b7f6806,3750aa1..78dc894
--- a/src/java/org/apache/cassandra/schema/TableParams.java
+++ b/src/java/org/apache/cassandra/schema/TableParams.java
@@@ -43,14 -41,14 +43,12 @@@ public final class TableParam
          COMMENT,
          COMPACTION,
          COMPRESSION,
--        DCLOCAL_READ_REPAIR_CHANCE,
          DEFAULT_TIME_TO_LIVE,
          EXTENSIONS,
          GC_GRACE_SECONDS,
          MAX_INDEX_INTERVAL,
          MEMTABLE_FLUSH_PERIOD_IN_MS,
          MIN_INDEX_INTERVAL,
--        READ_REPAIR_CHANCE,
          SPECULATIVE_RETRY,
          CRC_CHECK_CHANCE,
          CDC;
@@@ -62,9 -60,19 +60,7 @@@
          }
      }
  
 -    public static final String DEFAULT_COMMENT = "";
 -    public static final double DEFAULT_READ_REPAIR_CHANCE = 0.0;
 -    public static final double DEFAULT_DCLOCAL_READ_REPAIR_CHANCE = 0.1;
 -    public static final int DEFAULT_GC_GRACE_SECONDS = 864000; // 10 days
 -    public static final int DEFAULT_DEFAULT_TIME_TO_LIVE = 0;
 -    public static final int DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS = 0;
 -    public static final int DEFAULT_MIN_INDEX_INTERVAL = 128;
 -    public static final int DEFAULT_MAX_INDEX_INTERVAL = 2048;
 -    public static final double DEFAULT_CRC_CHECK_CHANCE = 1.0;
 -
      public final String comment;
--    public final double readRepairChance;
--    public final double dcLocalReadRepairChance;
      public final double bloomFilterFpChance;
      public final double crcCheckChance;
      public final int gcGraceSeconds;
@@@ -82,8 -90,8 +78,6 @@@
      private TableParams(Builder builder)
      {
          comment = builder.comment;
--        readRepairChance = builder.readRepairChance;
--        dcLocalReadRepairChance = builder.dcLocalReadRepairChance;
          bloomFilterFpChance = builder.bloomFilterFpChance == null
                              ? builder.compaction.defaultBloomFilterFbChance()
                              : builder.bloomFilterFpChance;
@@@ -113,14 -121,14 +107,12 @@@
                              .comment(params.comment)
                              .compaction(params.compaction)
                              .compression(params.compression)
--                            
.dcLocalReadRepairChance(params.dcLocalReadRepairChance)
                              .crcCheckChance(params.crcCheckChance)
                              .defaultTimeToLive(params.defaultTimeToLive)
                              .gcGraceSeconds(params.gcGraceSeconds)
                              .maxIndexInterval(params.maxIndexInterval)
                              
.memtableFlushPeriodInMs(params.memtableFlushPeriodInMs)
                              .minIndexInterval(params.minIndexInterval)
--                            .readRepairChance(params.readRepairChance)
                              .speculativeRetry(params.speculativeRetry)
                              .extensions(params.extensions)
                              .cdc(params.cdc);
@@@ -145,20 -148,20 +137,6 @@@
                   bloomFilterFpChance);
          }
  
--        if (dcLocalReadRepairChance < 0 || dcLocalReadRepairChance > 1.0)
--        {
--            fail("%s must be larger than or equal to 0 and smaller than or 
equal to 1.0 (got %s)",
--                 Option.DCLOCAL_READ_REPAIR_CHANCE,
--                 dcLocalReadRepairChance);
--        }
--
--        if (readRepairChance < 0 || readRepairChance > 1.0)
--        {
--            fail("%s must be larger than or equal to 0 and smaller than or 
equal to 1.0 (got %s)",
--                 Option.READ_REPAIR_CHANCE,
--                 readRepairChance);
--        }
--
          if (crcCheckChance < 0 || crcCheckChance > 1.0)
          {
              fail("%s must be larger than or equal to 0 and smaller than or 
equal to 1.0 (got %s)",
@@@ -208,8 -211,8 +186,6 @@@
          TableParams p = (TableParams) o;
  
          return comment.equals(p.comment)
--            && readRepairChance == p.readRepairChance
--            && dcLocalReadRepairChance == p.dcLocalReadRepairChance
              && bloomFilterFpChance == p.bloomFilterFpChance
              && crcCheckChance == p.crcCheckChance
              && gcGraceSeconds == p.gcGraceSeconds
@@@ -229,8 -232,8 +205,6 @@@
      public int hashCode()
      {
          return Objects.hashCode(comment,
--                                readRepairChance,
--                                dcLocalReadRepairChance,
                                  bloomFilterFpChance,
                                  crcCheckChance,
                                  gcGraceSeconds,
@@@ -251,8 -254,8 +225,6 @@@
      {
          return MoreObjects.toStringHelper(this)
                            .add(Option.COMMENT.toString(), comment)
--                          .add(Option.READ_REPAIR_CHANCE.toString(), 
readRepairChance)
--                          .add(Option.DCLOCAL_READ_REPAIR_CHANCE.toString(), 
dcLocalReadRepairChance)
                            .add(Option.BLOOM_FILTER_FP_CHANCE.toString(), 
bloomFilterFpChance)
                            .add(Option.CRC_CHECK_CHANCE.toString(), 
crcCheckChance)
                            .add(Option.GC_GRACE_SECONDS.toString(), 
gcGraceSeconds)
@@@ -271,17 -274,17 +243,15 @@@
  
      public static final class Builder
      {
 -        private String comment = DEFAULT_COMMENT;
 -        private double readRepairChance = DEFAULT_READ_REPAIR_CHANCE;
 -        private double dcLocalReadRepairChance = 
DEFAULT_DCLOCAL_READ_REPAIR_CHANCE;
 +        private String comment = "";
-         private double readRepairChance = 0.0;
-         private double dcLocalReadRepairChance = 0.1;
          private Double bloomFilterFpChance;
 -        public Double crcCheckChance = DEFAULT_CRC_CHECK_CHANCE;
 -        private int gcGraceSeconds = DEFAULT_GC_GRACE_SECONDS;
 -        private int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
 -        private int memtableFlushPeriodInMs = 
DEFAULT_MEMTABLE_FLUSH_PERIOD_IN_MS;
 -        private int minIndexInterval = DEFAULT_MIN_INDEX_INTERVAL;
 -        private int maxIndexInterval = DEFAULT_MAX_INDEX_INTERVAL;
 -        private SpeculativeRetryParam speculativeRetry = 
SpeculativeRetryParam.DEFAULT;
 +        public Double crcCheckChance = 1.0;
 +        private int gcGraceSeconds = 864000; // 10 days
 +        private int defaultTimeToLive = 0;
 +        private int memtableFlushPeriodInMs = 0;
 +        private int minIndexInterval = 128;
 +        private int maxIndexInterval = 2048;
 +        private SpeculativeRetryPolicy speculativeRetry = 
PercentileSpeculativeRetryPolicy.NINETY_NINE_P;
          private CachingParams caching = CachingParams.DEFAULT;
          private CompactionParams compaction = CompactionParams.DEFAULT;
          private CompressionParams compression = CompressionParams.DEFAULT;
@@@ -303,18 -306,18 +273,6 @@@
              return this;
          }
  
--        public Builder readRepairChance(double val)
--        {
--            readRepairChance = val;
--            return this;
--        }
--
--        public Builder dcLocalReadRepairChance(double val)
--        {
--            dcLocalReadRepairChance = val;
--            return this;
--        }
--
          public Builder bloomFilterFpChance(double val)
          {
              bloomFilterFpChance = val;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6ad99802/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --cc src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
index f45e623,0000000..6c8a45a
mode 100644,000000..100644
--- a/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/reads/AbstractReadExecutor.java
@@@ -1,481 -1,0 +1,436 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.cassandra.service.reads;
 +
 +import java.util.List;
- import java.util.concurrent.ThreadLocalRandom;
 +import java.util.concurrent.TimeUnit;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.collect.Iterables;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import org.apache.cassandra.concurrent.Stage;
 +import org.apache.cassandra.concurrent.StageManager;
 +import org.apache.cassandra.db.ColumnFamilyStore;
 +import org.apache.cassandra.db.ConsistencyLevel;
 +import org.apache.cassandra.db.DecoratedKey;
 +import org.apache.cassandra.db.ReadCommand;
 +import org.apache.cassandra.db.SinglePartitionReadCommand;
 +import org.apache.cassandra.db.Keyspace;
 +import org.apache.cassandra.db.partitions.PartitionIterator;
 +import org.apache.cassandra.exceptions.ReadFailureException;
 +import org.apache.cassandra.exceptions.ReadTimeoutException;
 +import org.apache.cassandra.exceptions.UnavailableException;
 +import org.apache.cassandra.locator.InetAddressAndPort;
- import org.apache.cassandra.metrics.ReadRepairMetrics;
 +import org.apache.cassandra.net.MessageOut;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.StorageProxy;
 +import org.apache.cassandra.service.reads.repair.ReadRepair;
- import org.apache.cassandra.schema.TableMetadata;
 +import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
 +import org.apache.cassandra.tracing.TraceState;
 +import org.apache.cassandra.tracing.Tracing;
 +
 +/**
 + * Sends a read request to the replicas needed to satisfy a given 
ConsistencyLevel.
 + *
 + * Optionally, may perform additional requests to provide redundancy against 
replica failure:
 + * AlwaysSpeculatingReadExecutor will always send a request to one extra 
replica, while
 + * SpeculatingReadExecutor will wait until it looks like the original request 
is in danger
 + * of timing out before performing extra reads.
 + */
 +public abstract class AbstractReadExecutor
 +{
 +    private static final Logger logger = 
LoggerFactory.getLogger(AbstractReadExecutor.class);
 +
 +    protected final ReadCommand command;
 +    protected final ConsistencyLevel consistency;
 +    protected final List<InetAddressAndPort> targetReplicas;
 +    protected final ReadRepair readRepair;
 +    protected final DigestResolver digestResolver;
 +    protected final ReadCallback handler;
 +    protected final TraceState traceState;
 +    protected final ColumnFamilyStore cfs;
 +    protected final long queryStartNanoTime;
 +    protected volatile PartitionIterator result = null;
 +
 +    AbstractReadExecutor(Keyspace keyspace, ColumnFamilyStore cfs, 
ReadCommand command, ConsistencyLevel consistency, List<InetAddressAndPort> 
targetReplicas, long queryStartNanoTime)
 +    {
 +        this.command = command;
 +        this.consistency = consistency;
 +        this.targetReplicas = targetReplicas;
 +        this.readRepair = ReadRepair.create(command, targetReplicas, 
queryStartNanoTime, consistency);
 +        this.digestResolver = new DigestResolver(keyspace, command, 
consistency, readRepair, targetReplicas.size());
 +        this.handler = new ReadCallback(digestResolver, consistency, command, 
targetReplicas, queryStartNanoTime, readRepair);
 +        this.cfs = cfs;
 +        this.traceState = Tracing.instance.get();
 +        this.queryStartNanoTime = queryStartNanoTime;
 +
 +        // Set the digest version (if we request some digests). This is the 
smallest version amongst all our target replicas since new nodes
 +        // knows how to produce older digest but the reverse is not true.
 +        // TODO: we need this when talking with pre-3.0 nodes. So if we 
preserve the digest format moving forward, we can get rid of this once
 +        // we stop being compatible with pre-3.0 nodes.
 +        int digestVersion = MessagingService.current_version;
 +        for (InetAddressAndPort replica : targetReplicas)
 +            digestVersion = Math.min(digestVersion, 
MessagingService.instance().getVersion(replica));
 +        command.setDigestVersion(digestVersion);
 +    }
 +
 +    private DecoratedKey getKey()
 +    {
 +        if (command instanceof SinglePartitionReadCommand)
 +        {
 +            return ((SinglePartitionReadCommand) command).partitionKey();
 +        }
 +        else
 +        {
 +            return null;
 +        }
 +    }
 +
 +    protected void makeDataRequests(Iterable<InetAddressAndPort> endpoints)
 +    {
 +        makeRequests(command, endpoints);
 +
 +    }
 +
 +    protected void makeDigestRequests(Iterable<InetAddressAndPort> endpoints)
 +    {
 +        makeRequests(command.copyAsDigestQuery(), endpoints);
 +    }
 +
 +    private void makeRequests(ReadCommand readCommand, 
Iterable<InetAddressAndPort> endpoints)
 +    {
 +        boolean hasLocalEndpoint = false;
 +
 +        for (InetAddressAndPort endpoint : endpoints)
 +        {
 +            if (StorageProxy.canDoLocalRequest(endpoint))
 +            {
 +                hasLocalEndpoint = true;
 +                continue;
 +            }
 +
 +            if (traceState != null)
 +                traceState.trace("reading {} from {}", 
readCommand.isDigestQuery() ? "digest" : "data", endpoint);
 +            logger.trace("reading {} from {}", readCommand.isDigestQuery() ? 
"digest" : "data", endpoint);
 +            MessageOut<ReadCommand> message = readCommand.createMessage();
 +            MessagingService.instance().sendRRWithFailure(message, endpoint, 
handler);
 +        }
 +
 +        // We delay the local (potentially blocking) read till the end to 
avoid stalling remote requests.
 +        if (hasLocalEndpoint)
 +        {
 +            logger.trace("reading {} locally", readCommand.isDigestQuery() ? 
"digest" : "data");
 +            StageManager.getStage(Stage.READ).maybeExecuteImmediately(new 
LocalReadRunnable(command, handler));
 +        }
 +    }
 +
 +    /**
 +     * Perform additional requests if it looks like the original will time 
out.  May block while it waits
 +     * to see if the original requests are answered first.
 +     */
 +    public abstract void maybeTryAdditionalReplicas();
 +
 +    /**
 +     * Get the replicas involved in the [finished] request.
 +     *
 +     * @return target replicas + the extra replica, *IF* we speculated.
 +     */
 +    public abstract List<InetAddressAndPort> getContactedReplicas();
 +
 +    /**
 +     * send the initial set of requests
 +     */
 +    public abstract void executeAsync();
 +
-     private static ReadRepairDecision newReadRepairDecision(TableMetadata 
metadata)
-     {
-         if (metadata.params.readRepairChance > 0d ||
-             metadata.params.dcLocalReadRepairChance > 0)
-         {
-             double chance = ThreadLocalRandom.current().nextDouble();
-             if (metadata.params.readRepairChance > chance)
-                 return ReadRepairDecision.GLOBAL;
- 
-             if (metadata.params.dcLocalReadRepairChance > chance)
-                 return ReadRepairDecision.DC_LOCAL;
-         }
- 
-         return ReadRepairDecision.NONE;
-     }
- 
 +    /**
 +     * @return an executor appropriate for the configured speculative read 
policy
 +     */
 +    public static AbstractReadExecutor 
getReadExecutor(SinglePartitionReadCommand command, ConsistencyLevel 
consistencyLevel, long queryStartNanoTime) throws UnavailableException
 +    {
 +        Keyspace keyspace = Keyspace.open(command.metadata().keyspace);
 +        List<InetAddressAndPort> allReplicas = 
StorageProxy.getLiveSortedEndpoints(keyspace, command.partitionKey());
-         // 11980: Excluding EACH_QUORUM reads from potential RR, so that we 
do not miscount DC responses
-         ReadRepairDecision repairDecision = consistencyLevel == 
ConsistencyLevel.EACH_QUORUM
-                                             ? ReadRepairDecision.NONE
-                                             : 
newReadRepairDecision(command.metadata());
-         List<InetAddressAndPort> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas, repairDecision);
++        List<InetAddressAndPort> targetReplicas = 
consistencyLevel.filterForQuery(keyspace, allReplicas);
 +
 +        // Throw UAE early if we don't have enough replicas.
 +        consistencyLevel.assureSufficientLiveNodes(keyspace, targetReplicas);
 +
-         if (repairDecision != ReadRepairDecision.NONE)
-         {
-             Tracing.trace("Read-repair {}", repairDecision);
-             ReadRepairMetrics.attempted.mark();
-         }
- 
 +        ColumnFamilyStore cfs = 
keyspace.getColumnFamilyStore(command.metadata().id);
 +        SpeculativeRetryPolicy retry = cfs.metadata().params.speculativeRetry;
 +
 +        // Speculative retry is disabled *OR*
 +        // 11980: Disable speculative retry if using EACH_QUORUM in order to 
prevent miscounting DC responses
-         if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE)
-             | consistencyLevel == ConsistencyLevel.EACH_QUORUM)
++        if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || 
consistencyLevel == ConsistencyLevel.EACH_QUORUM)
 +            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, false);
 +
 +        // There are simply no extra replicas to speculate.
 +        // Handle this separately so it can log failed attempts to speculate 
due to lack of replicas
 +        if (consistencyLevel.blockFor(keyspace) == allReplicas.size())
 +            return new NeverSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime, true);
 +
 +        if (targetReplicas.size() == allReplicas.size())
 +        {
-             // CL.ALL, RRD.GLOBAL or RRD.DC_LOCAL and a single-DC.
++            // CL.ALL
 +            // We are going to contact every node anyway, so ask for 2 full 
data requests instead of 1, for redundancy
 +            // (same amount of requests in total, but we turn 1 digest 
request into a full blown data request).
 +            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
 +        }
 +
-         // RRD.NONE or RRD.DC_LOCAL w/ multiple DCs.
-         InetAddressAndPort extraReplica = 
allReplicas.get(targetReplicas.size());
-         // With repair decision DC_LOCAL all replicas/target replicas may be 
in different order, so
-         // we might have to find a replacement that's not already in 
targetReplicas.
-         if (repairDecision == ReadRepairDecision.DC_LOCAL && 
targetReplicas.contains(extraReplica))
-         {
-             for (InetAddressAndPort address : allReplicas)
-             {
-                 if (!targetReplicas.contains(address))
-                 {
-                     extraReplica = address;
-                     break;
-                 }
-             }
-         }
-         targetReplicas.add(extraReplica);
++        targetReplicas.add(allReplicas.get(targetReplicas.size()));
 +
 +        if (retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE))
 +            return new AlwaysSpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
 +        else // PERCENTILE or CUSTOM.
 +            return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas, queryStartNanoTime);
 +    }
 +
 +    /**
 +     *  Returns true if speculation should occur and if it should then block 
until it is time to
 +     *  send the speculative reads
 +     */
 +    boolean shouldSpeculateAndMaybeWait()
 +    {
 +        // no latency information, or we're overloaded
 +        if (cfs.sampleLatencyNanos > 
TimeUnit.MILLISECONDS.toNanos(command.getTimeout()))
 +            return false;
 +
 +        return !handler.await(cfs.sampleLatencyNanos, TimeUnit.NANOSECONDS);
 +    }
 +
 +    void onReadTimeout() {}
 +
 +    public static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
 +    {
 +        /**
 +         * If never speculating due to lack of replicas
 +         * log it is as a failure if it should have happened
 +         * but couldn't due to lack of replicas
 +         */
 +        private final boolean logFailedSpeculation;
 +
 +        public NeverSpeculatingReadExecutor(Keyspace keyspace, 
ColumnFamilyStore cfs, ReadCommand command, ConsistencyLevel consistencyLevel, 
List<InetAddressAndPort> targetReplicas, long queryStartNanoTime, boolean 
logFailedSpeculation)
 +        {
 +            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
 +            this.logFailedSpeculation = logFailedSpeculation;
 +        }
 +
 +        public void executeAsync()
 +        {
 +            makeDataRequests(targetReplicas.subList(0, 1));
 +            if (targetReplicas.size() > 1)
 +                makeDigestRequests(targetReplicas.subList(1, 
targetReplicas.size()));
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            if (shouldSpeculateAndMaybeWait() && logFailedSpeculation)
 +            {
 +                cfs.metric.speculativeInsufficientReplicas.inc();
 +            }
 +        }
 +
 +        public List<InetAddressAndPort> getContactedReplicas()
 +        {
 +            return targetReplicas;
 +        }
 +    }
 +
 +    static class SpeculatingReadExecutor extends AbstractReadExecutor
 +    {
 +        private volatile boolean speculated = false;
 +
 +        public SpeculatingReadExecutor(Keyspace keyspace,
 +                                       ColumnFamilyStore cfs,
 +                                       ReadCommand command,
 +                                       ConsistencyLevel consistencyLevel,
 +                                       List<InetAddressAndPort> 
targetReplicas,
 +                                       long queryStartNanoTime)
 +        {
 +            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
 +        }
 +
 +        public void executeAsync()
 +        {
 +            // if CL + RR result in covering all replicas, getReadExecutor 
forces AlwaysSpeculating.  So we know
 +            // that the last replica in our list is "extra."
 +            List<InetAddressAndPort> initialReplicas = 
targetReplicas.subList(0, targetReplicas.size() - 1);
 +
 +            if (handler.blockfor < initialReplicas.size())
 +            {
 +                // We're hitting additional targets for read repair.  Since 
our "extra" replica is the least-
 +                // preferred by the snitch, we do an extra data read to start 
with against a replica more
 +                // likely to reply; better to let RR fail than the entire 
query.
 +                makeDataRequests(initialReplicas.subList(0, 2));
 +                if (initialReplicas.size() > 2)
 +                    makeDigestRequests(initialReplicas.subList(2, 
initialReplicas.size()));
 +            }
 +            else
 +            {
 +                // not doing read repair; all replies are important, so it 
doesn't matter which nodes we
 +                // perform data reads against vs digest.
 +                makeDataRequests(initialReplicas.subList(0, 1));
 +                if (initialReplicas.size() > 1)
 +                    makeDigestRequests(initialReplicas.subList(1, 
initialReplicas.size()));
 +            }
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            if (shouldSpeculateAndMaybeWait())
 +            {
 +                //Handle speculation stats first in case the callback fires 
immediately
 +                speculated = true;
 +                cfs.metric.speculativeRetries.inc();
 +                // Could be waiting on the data, or on enough digests.
 +                ReadCommand retryCommand = command;
 +                if (handler.resolver.isDataPresent())
 +                    retryCommand = command.copyAsDigestQuery();
 +
 +                InetAddressAndPort extraReplica = 
Iterables.getLast(targetReplicas);
 +                if (traceState != null)
 +                    traceState.trace("speculating read retry on {}", 
extraReplica);
 +                logger.trace("speculating read retry on {}", extraReplica);
 +                
MessagingService.instance().sendRRWithFailure(retryCommand.createMessage(), 
extraReplica, handler);
 +            }
 +        }
 +
 +        public List<InetAddressAndPort> getContactedReplicas()
 +        {
 +            return speculated
 +                 ? targetReplicas
 +                 : targetReplicas.subList(0, targetReplicas.size() - 1);
 +        }
 +
 +        @Override
 +        void onReadTimeout()
 +        {
 +            //Shouldn't be possible to get here without first attempting to 
speculate even if the
 +            //timing is bad
 +            assert speculated;
 +            cfs.metric.speculativeFailedRetries.inc();
 +        }
 +    }
 +
 +    private static class AlwaysSpeculatingReadExecutor extends 
AbstractReadExecutor
 +    {
 +        public AlwaysSpeculatingReadExecutor(Keyspace keyspace,
 +                                             ColumnFamilyStore cfs,
 +                                             ReadCommand command,
 +                                             ConsistencyLevel 
consistencyLevel,
 +                                             List<InetAddressAndPort> 
targetReplicas,
 +                                             long queryStartNanoTime)
 +        {
 +            super(keyspace, cfs, command, consistencyLevel, targetReplicas, 
queryStartNanoTime);
 +        }
 +
 +        public void maybeTryAdditionalReplicas()
 +        {
 +            // no-op
 +        }
 +
 +        public List<InetAddressAndPort> getContactedReplicas()
 +        {
 +            return targetReplicas;
 +        }
 +
 +        @Override
 +        public void executeAsync()
 +        {
 +            makeDataRequests(targetReplicas.subList(0, targetReplicas.size() 
> 1 ? 2 : 1));
 +            if (targetReplicas.size() > 2)
 +                makeDigestRequests(targetReplicas.subList(2, 
targetReplicas.size()));
 +            cfs.metric.speculativeRetries.inc();
 +        }
 +
 +        @Override
 +        void onReadTimeout()
 +        {
 +            cfs.metric.speculativeFailedRetries.inc();
 +        }
 +    }
 +
 +    public void setResult(PartitionIterator result)
 +    {
 +        Preconditions.checkState(this.result == null, "Result can only be set 
once");
 +        this.result = result;
 +    }
 +
 +    /**
 +     * Wait for the CL to be satisfied by responses
 +     */
 +    public void awaitResponses() throws ReadTimeoutException
 +    {
 +        try
 +        {
 +            handler.awaitResults();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            try
 +            {
 +                onReadTimeout();
 +            }
 +            finally
 +            {
 +                throw e;
 +            }
 +        }
 +
 +        // return immediately, or begin a read repair
 +        if (digestResolver.responsesMatch())
 +        {
 +            setResult(digestResolver.getData());
 +        }
 +        else
 +        {
 +            Tracing.trace("Digest mismatch: Mismatch for key {}", getKey());
-             readRepair.startForegroundRepair(digestResolver, 
handler.endpoints, getContactedReplicas(), this::setResult);
++            readRepair.startRepair(digestResolver, handler.endpoints, 
getContactedReplicas(), this::setResult);
 +        }
 +    }
 +
 +    public void awaitReadRepair() throws ReadTimeoutException
 +    {
 +        try
 +        {
-             readRepair.awaitForegroundRepairFinish();
++            readRepair.awaitRepair();
 +        }
 +        catch (ReadTimeoutException e)
 +        {
 +            if (Tracing.isTracing())
 +                Tracing.trace("Timed out waiting on digest mismatch repair 
requests");
 +            else
 +                logger.trace("Timed out waiting on digest mismatch repair 
requests");
 +            // the caught exception here will have CL.ALL from the repair 
command,
 +            // not whatever CL the initial command was at (CASSANDRA-7947)
 +            int blockFor = 
consistency.blockFor(Keyspace.open(command.metadata().keyspace));
 +            throw new ReadTimeoutException(consistency, blockFor-1, blockFor, 
true);
 +        }
 +    }
 +
 +    public void maybeRepairAdditionalReplicas()
 +    {
 +        // TODO: this
 +    }
 +
 +    public PartitionIterator getResult() throws ReadFailureException, 
ReadTimeoutException
 +    {
 +        Preconditions.checkState(result != null, "Result must be set first");
 +        return result;
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to