http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java 
b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
deleted file mode 100644
index 4dfea75..0000000
--- a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.net.InetAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.dht.Token;
-import org.apache.cassandra.locator.AbstractReplicationStrategy;
-import org.apache.cassandra.utils.FBUtilities;
-
-public final class MaterializedViewUtils
-{
-    private MaterializedViewUtils()
-    {
-    }
-
-    /**
-     * Calculate the natural endpoint for the view.
-     *
-     * The view natural endpoint is the endpint which has the same cardinality 
as this node in the replication factor.
-     * The cardinality is the number at which this node would store a piece of 
data, given the change in replication
-     * factor.
-     *
-     * For example, if we have the following ring:
-     *   A, T1 -> B, T2 -> C, T3 -> A
-     *
-     * For the token T1, at RF=1, A would be included, so A's cardinality for 
T1 is 1. For the token T1, at RF=2, B would
-     * be included, so B's cardinality for token T1 is 2. For token T3, at RF 
= 2, A would be included, so A's cardinality
-     * for T3 is 2.
-     *
-     * For a view whose base token is T1 and whose view token is T3, the 
pairings between the nodes would be:
-     *  A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 
for T3)
-     *  B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 
for T3)
-     *  C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 
for T3)
-     *
-     * @throws RuntimeException if this method is called using a base token 
which does not belong to this replica
-     */
-    public static InetAddress getViewNaturalEndpoint(String keyspaceName, 
Token baseToken, Token viewToken)
-    {
-        AbstractReplicationStrategy replicationStrategy = 
Keyspace.open(keyspaceName).getReplicationStrategy();
-
-        String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-        List<InetAddress> localBaseEndpoints = new ArrayList<>();
-        List<InetAddress> localViewEndpoints = new ArrayList<>();
-        for (InetAddress baseEndpoint : 
replicationStrategy.getNaturalEndpoints(baseToken))
-        {
-            if 
(DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter))
-                localBaseEndpoints.add(baseEndpoint);
-        }
-
-        for (InetAddress viewEndpoint : 
replicationStrategy.getNaturalEndpoints(viewToken))
-        {
-            // If we are a base endpoint which is also a view replica, we use 
ourselves as our view replica
-            if (viewEndpoint.equals(FBUtilities.getBroadcastAddress()))
-                return viewEndpoint;
-
-            // We have to remove any endpoint which is shared between the base 
and the view, as it will select itself
-            // and throw off the counts otherwise.
-            if (localBaseEndpoints.contains(viewEndpoint))
-                localBaseEndpoints.remove(viewEndpoint);
-            else if 
(DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter))
-                localViewEndpoints.add(viewEndpoint);
-        }
-
-        // The replication strategy will be the same for the base and the 
view, as they must belong to the same keyspace.
-        // Since the same replication strategy is used, the same placement 
should be used and we should get the same
-        // number of replicas for all of the tokens in the ring.
-        assert localBaseEndpoints.size() == localViewEndpoints.size() : 
"Replication strategy should have the same number of endpoints for the base and 
the view";
-        int baseIdx = 
localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress());
-        if (baseIdx < 0)
-            throw new RuntimeException("Trying to get the view natural 
endpoint on a non-data replica");
-
-        return localViewEndpoints.get(baseIdx);
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/TemporalRow.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java 
b/src/java/org/apache/cassandra/db/view/TemporalRow.java
deleted file mode 100644
index 53e4e91..0000000
--- a/src/java/org/apache/cassandra/db/view/TemporalRow.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.db.view;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-import com.google.common.collect.Iterables;
-
-import org.apache.cassandra.config.CFMetaData;
-import org.apache.cassandra.config.ColumnDefinition;
-import org.apache.cassandra.cql3.ColumnIdentifier;
-import org.apache.cassandra.db.CBuilder;
-import org.apache.cassandra.db.Clustering;
-import org.apache.cassandra.db.ColumnFamilyStore;
-import org.apache.cassandra.db.Conflicts;
-import org.apache.cassandra.db.DecoratedKey;
-import org.apache.cassandra.db.DeletionInfo;
-import org.apache.cassandra.db.DeletionTime;
-import org.apache.cassandra.db.LivenessInfo;
-import org.apache.cassandra.db.RangeTombstone;
-import org.apache.cassandra.db.Slice;
-import org.apache.cassandra.db.marshal.CompositeType;
-import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition;
-import org.apache.cassandra.db.rows.BufferCell;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.db.rows.CellPath;
-import org.apache.cassandra.db.rows.Row;
-import org.apache.cassandra.utils.ByteBufferUtil;
-import org.apache.cassandra.utils.FBUtilities;
-
-/**
- * Represents a single CQL Row in a base table, with both the currently 
persisted value and the update's value. The
- * values are stored in timestamp order, but also indicate whether they are 
from the currently persisted, allowing a
- * {@link TemporalRow.Resolver} to resolve if the value is an old value that 
has been updated; if it sorts after the
- * update's value, then it does not qualify.
- */
-public class TemporalRow
-{
-    private static final int NO_TTL = LivenessInfo.NO_TTL;
-    private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP;
-    private static final int NO_DELETION_TIME = 
DeletionTime.LIVE.localDeletionTime();
-
-    public interface Resolver
-    {
-        /**
-         * @param cells Iterable of all cells for a certain TemporalRow's 
Cell, in timestamp sorted order
-         * @return      A single TemporalCell from the iterable which 
satisfies the resolution criteria, or null if
-         *              there is no cell which qualifies
-         */
-        TemporalCell resolve(Iterable<TemporalCell> cells);
-    }
-
-    /**
-     * Returns the first value in the iterable if it is from the set of 
persisted cells, and the cell which results from
-     * reconciliation of the remaining cells does not have the same value.
-     */
-    public static final Resolver oldValueIfUpdated = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-
-        TemporalCell initial = iterator.next();
-        if (initial.isNew || !iterator.hasNext())
-            return null;
-
-        TemporalCell value = initial;
-        while (iterator.hasNext())
-            value = value.reconcile(iterator.next());
-
-        return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 
? initial : null;
-    };
-
-    public static final Resolver earliest = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-        return iterator.next();
-    };
-
-    public static final Resolver latest = cells -> {
-        Iterator<TemporalCell> iterator = cells.iterator();
-        if (!iterator.hasNext())
-            return null;
-
-        TemporalCell value = iterator.next();
-        while (iterator.hasNext())
-            value = value.reconcile(iterator.next());
-
-        return value;
-    };
-
-    private static class TemporalCell
-    {
-        public final ByteBuffer value;
-        public final long timestamp;
-        public final int ttl;
-        public final int localDeletionTime;
-        public final boolean isNew;
-
-        private TemporalCell(ByteBuffer value, long timestamp, int ttl, int 
localDeletionTime, boolean isNew)
-        {
-            this.value = value;
-            this.timestamp = timestamp;
-            this.ttl = ttl;
-            this.localDeletionTime = localDeletionTime;
-            this.isNew = isNew;
-        }
-
-        public TemporalCell reconcile(TemporalCell that)
-        {
-            int now = FBUtilities.nowInSeconds();
-            Conflicts.Resolution resolution = 
Conflicts.resolveRegular(that.timestamp,
-                                                                       
that.isLive(now),
-                                                                       
that.localDeletionTime,
-                                                                       
that.value,
-                                                                       
this.timestamp,
-                                                                       
this.isLive(now),
-                                                                       
this.localDeletionTime,
-                                                                       
this.value);
-            assert resolution != Conflicts.Resolution.MERGE;
-            if (resolution == Conflicts.Resolution.LEFT_WINS)
-                return that;
-            return this;
-        }
-
-        private boolean isLive(int now)
-        {
-            return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && 
now < localDeletionTime);
-        }
-
-        public Cell cell(ColumnDefinition definition, CellPath cellPath)
-        {
-            return new BufferCell(definition, timestamp, ttl, 
localDeletionTime, value, cellPath);
-        }
-    }
-
-    private final ColumnFamilyStore baseCfs;
-    private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
-    private final ByteBuffer basePartitionKey;
-    public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns;
-    public final int nowInSec;
-    private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, 
TemporalCell>>> columnValues = new HashMap<>();
-    private int viewClusteringTtl = NO_TTL;
-    private long viewClusteringTimestamp = NO_TIMESTAMP;
-    private int viewClusteringLocalDeletionTime = NO_DELETION_TIME;
-
-    TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> 
viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew)
-    {
-        this.baseCfs = baseCfs;
-        this.viewPrimaryKey = viewPrimaryKey;
-        this.basePartitionKey = key;
-        this.nowInSec = nowInSec;
-        clusteringColumns = new HashMap<>();
-        LivenessInfo liveness = row.primaryKeyLivenessInfo();
-        this.viewClusteringLocalDeletionTime = 
minValueIfSet(viewClusteringLocalDeletionTime, 
row.deletion().localDeletionTime(), NO_DELETION_TIME);
-        this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, 
liveness.timestamp(), NO_TIMESTAMP);
-        this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, 
liveness.ttl(), NO_TTL);
-
-        List<ColumnDefinition> clusteringDefs = 
baseCfs.metadata.clusteringColumns();
-        for (int i = 0; i < clusteringDefs.size(); i++)
-        {
-            ColumnDefinition cdef = clusteringDefs.get(i);
-            clusteringColumns.put(cdef.name, row.clustering().get(i));
-
-            addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, 
NO_DELETION_TIME, row.clustering().get(i), isNew);
-        }
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        if (this == o) return true;
-        if (o == null || getClass() != o.getClass()) return false;
-
-        TemporalRow that = (TemporalRow) o;
-
-        if (!clusteringColumns.equals(that.clusteringColumns)) return false;
-        if (!basePartitionKey.equals(that.basePartitionKey)) return false;
-
-        return true;
-    }
-
-    @Override
-    public int hashCode()
-    {
-        int result = basePartitionKey.hashCode();
-        result = 31 * result + clusteringColumns.hashCode();
-        return result;
-    }
-
-    public void addColumnValue(ColumnIdentifier identifier,
-                               CellPath cellPath,
-                               long timestamp,
-                               int ttl,
-                               int localDeletionTime,
-                               ByteBuffer value,  boolean isNew)
-    {
-        if (!columnValues.containsKey(identifier))
-            columnValues.put(identifier, new HashMap<>());
-
-        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = 
columnValues.get(identifier);
-
-        if (!innerMap.containsKey(cellPath))
-            innerMap.put(cellPath, new TreeMap<>());
-
-        // If this column is part of the view's primary keys
-        if (viewPrimaryKey.contains(identifier))
-        {
-            this.viewClusteringTtl = minValueIfSet(this.viewClusteringTtl, 
ttl, NO_TTL);
-            this.viewClusteringTimestamp = 
minValueIfSet(this.viewClusteringTimestamp, timestamp, NO_TIMESTAMP);
-            this.viewClusteringLocalDeletionTime = 
minValueIfSet(this.viewClusteringLocalDeletionTime, localDeletionTime, 
NO_DELETION_TIME);
-        }
-
-        innerMap.get(cellPath).put(timestamp, new TemporalCell(value, 
timestamp, ttl, localDeletionTime, isNew));
-    }
-
-    private static int minValueIfSet(int existing, int update, int 
defaultValue)
-    {
-        if (existing == defaultValue)
-            return update;
-        if (update == defaultValue)
-            return existing;
-        return Math.min(existing, update);
-    }
-
-    private static long minValueIfSet(long existing, long update, long 
defaultValue)
-    {
-        if (existing == defaultValue)
-            return update;
-        if (update == defaultValue)
-            return existing;
-        return Math.min(existing, update);
-    }
-
-    public int viewClusteringTtl()
-    {
-        return viewClusteringTtl;
-    }
-
-    public long viewClusteringTimestamp()
-    {
-        return viewClusteringTimestamp;
-    }
-
-    public int viewClusteringLocalDeletionTime()
-    {
-        return viewClusteringLocalDeletionTime;
-    }
-
-    public void addCell(Cell cell, boolean isNew)
-    {
-        addColumnValue(cell.column().name, cell.path(), cell.timestamp(), 
cell.ttl(), cell.localDeletionTime(), cell.value(), isNew);
-    }
-
-    // The Definition here is actually the *base table* definition
-    public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver 
resolver)
-    {
-        ColumnDefinition baseDefinition = 
definition.cfName.equals(baseCfs.name)
-                                          ? definition
-                                          : 
baseCfs.metadata.getColumnDefinition(definition.name);
-
-        if (baseDefinition.isPartitionKey())
-        {
-            if (baseDefinition.isOnAllComponents())
-                return basePartitionKey;
-            else
-            {
-                CompositeType keyComparator = (CompositeType) 
baseCfs.metadata.getKeyValidator();
-                ByteBuffer[] components = 
keyComparator.split(basePartitionKey);
-                return components[baseDefinition.position()];
-            }
-        }
-        else
-        {
-            ColumnIdentifier columnIdentifier = baseDefinition.name;
-
-            if (clusteringColumns.containsKey(columnIdentifier))
-                return clusteringColumns.get(columnIdentifier);
-
-            Collection<org.apache.cassandra.db.rows.Cell> val = 
values(definition, resolver);
-            if (val != null && val.size() == 1)
-                return Iterables.getOnlyElement(val).value();
-        }
-        return null;
-    }
-
-    public DeletionTime deletionTime(AbstractThreadUnsafePartition partition)
-    {
-        DeletionInfo deletionInfo = partition.deletionInfo();
-        if (!deletionInfo.getPartitionDeletion().isLive())
-            return deletionInfo.getPartitionDeletion();
-
-        Clustering baseClustering = baseClusteringBuilder().build();
-        RangeTombstone clusterTombstone = 
deletionInfo.rangeCovering(baseClustering);
-        if (clusterTombstone != null)
-            return clusterTombstone.deletionTime();
-
-        Row row = partition.getRow(baseClustering);
-        return row == null || row.deletion().isLive() ? DeletionTime.LIVE : 
row.deletion();
-    }
-
-    public Collection<org.apache.cassandra.db.rows.Cell> 
values(ColumnDefinition definition, Resolver resolver)
-    {
-        Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = 
columnValues.get(definition.name);
-        if (innerMap == null)
-        {
-            return Collections.emptyList();
-        }
-
-        Collection<org.apache.cassandra.db.rows.Cell> value = new 
ArrayList<>();
-        for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells : 
innerMap.entrySet())
-        {
-            TemporalCell cell = 
resolver.resolve(pathAndCells.getValue().values());
-
-            if (cell != null)
-                value.add(cell.cell(definition, pathAndCells.getKey()));
-        }
-        return value;
-    }
-
-    public Slice baseSlice()
-    {
-        return baseClusteringBuilder().buildSlice();
-    }
-
-    private CBuilder baseClusteringBuilder()
-    {
-        CFMetaData metadata = baseCfs.metadata;
-        CBuilder builder = CBuilder.create(metadata.comparator);
-
-        ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()];
-        for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : 
clusteringColumns.entrySet())
-            buffers[metadata.getColumnDefinition(buffer.getKey()).position()] 
= buffer.getValue();
-
-        for (ByteBuffer byteBuffer : buffers)
-            builder = builder.add(byteBuffer);
-
-        return builder;
-    }
-
-    static class Set implements Iterable<TemporalRow>
-    {
-        private final ColumnFamilyStore baseCfs;
-        private final java.util.Set<ColumnIdentifier> viewPrimaryKey;
-        private final ByteBuffer key;
-        public final DecoratedKey dk;
-        private final Map<Clustering, TemporalRow> clusteringToRow;
-        final int nowInSec = FBUtilities.nowInSeconds();
-
-        Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> 
viewPrimaryKey, ByteBuffer key)
-        {
-            this.baseCfs = baseCfs;
-            this.viewPrimaryKey = viewPrimaryKey;
-            this.key = key;
-            this.dk = baseCfs.partitioner.decorateKey(key);
-            this.clusteringToRow = new HashMap<>();
-        }
-
-        public Iterator<TemporalRow> iterator()
-        {
-            return clusteringToRow.values().iterator();
-        }
-
-        public TemporalRow getClustering(Clustering clustering)
-        {
-            return clusteringToRow.get(clustering);
-        }
-
-        public void addRow(Row row, boolean isNew)
-        {
-            TemporalRow temporalRow = clusteringToRow.get(row.clustering());
-            if (temporalRow == null)
-            {
-                temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, 
row, nowInSec, isNew);
-                clusteringToRow.put(row.clustering(), temporalRow);
-            }
-
-            for (Cell cell: row.cells())
-            {
-                temporalRow.addCell(cell, isNew);
-            }
-        }
-
-        public int size()
-        {
-            return clusteringToRow.size();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java 
b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
index 9c886b0..8f52bb1 100644
--- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
+++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java
@@ -87,7 +87,7 @@ public class KeyspaceMetrics
     public final LatencyMetrics casPropose;
     /** CAS Commit metrics */
     public final LatencyMetrics casCommit;
-
+    
     public final MetricNameFactory factory;
     private Keyspace keyspace;
     

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java 
b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
deleted file mode 100644
index 39a5574..0000000
--- a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.metrics;
-
-import com.codahale.metrics.Counter;
-
-import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;
-
-public class MVWriteMetrics extends ClientRequestMetrics
-{
-    public final Counter viewReplicasAttempted;
-    public final Counter viewReplicasSuccess;
-
-    public MVWriteMetrics(String scope) {
-        super(scope);
-        viewReplicasAttempted = 
Metrics.counter(factory.createMetricName("ViewReplicasAttempted"));
-        viewReplicasSuccess = 
Metrics.counter(factory.createMetricName("ViewReplicasSuccess"));
-    }
-
-    public void release()
-    {
-        super.release();
-        Metrics.remove(factory.createMetricName("ViewReplicasAttempted"));
-        Metrics.remove(factory.createMetricName("ViewReplicasSuccess"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/net/MessagingService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 4f15da2..a5f3601 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -134,8 +134,6 @@ public final class MessagingService implements 
MessagingServiceMBean
         PAXOS_PROPOSE,
         PAXOS_COMMIT,
         @Deprecated PAGED_RANGE,
-        BATCHLOG_MUTATION,
-        MATERIALIZED_VIEW_MUTATION,
         // remember to add new verbs at the end, since we serialize by ordinal
         UNUSED_1,
         UNUSED_2,
@@ -147,8 +145,6 @@ public final class MessagingService implements 
MessagingServiceMBean
     {{
         put(Verb.MUTATION, Stage.MUTATION);
         put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION);
-        put(Verb.MATERIALIZED_VIEW_MUTATION, Stage.MATERIALIZED_VIEW_MUTATION);
-        put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION);
         put(Verb.READ_REPAIR, Stage.MUTATION);
         put(Verb.TRUNCATE, Stage.MUTATION);
         put(Verb.PAXOS_PREPARE, Stage.MUTATION);
@@ -207,8 +203,6 @@ public final class MessagingService implements 
MessagingServiceMBean
         put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
 
         put(Verb.MUTATION, Mutation.serializer);
-        put(Verb.BATCHLOG_MUTATION, Mutation.serializer);
-        put(Verb.MATERIALIZED_VIEW_MUTATION, Mutation.serializer);
         put(Verb.READ_REPAIR, Mutation.serializer);
         put(Verb.READ, ReadCommand.serializer);
         //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer);
@@ -235,8 +229,6 @@ public final class MessagingService implements 
MessagingServiceMBean
     public static final EnumMap<Verb, IVersionedSerializer<?>> 
callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class)
     {{
         put(Verb.MUTATION, WriteResponse.serializer);
-        put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer);
-        put(Verb.MATERIALIZED_VIEW_MUTATION, WriteResponse.serializer);
         put(Verb.READ_REPAIR, WriteResponse.serializer);
         put(Verb.COUNTER_MUTATION, WriteResponse.serializer);
         put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer);
@@ -299,8 +291,6 @@ public final class MessagingService implements 
MessagingServiceMBean
      */
     public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE,
                                                                    
Verb.MUTATION,
-                                                                   
Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable??
-                                                                   
Verb.MATERIALIZED_VIEW_MUTATION,
                                                                    
Verb.COUNTER_MUTATION,
                                                                    
Verb.READ_REPAIR,
                                                                    Verb.READ,
@@ -628,10 +618,7 @@ public final class MessagingService implements 
MessagingServiceMBean
                            ConsistencyLevel consistencyLevel,
                            boolean allowHints)
     {
-        assert message.verb == Verb.MUTATION
-               || message.verb == Verb.BATCHLOG_MUTATION
-               || message.verb == Verb.MATERIALIZED_VIEW_MUTATION
-               || message.verb == Verb.COUNTER_MUTATION;
+        assert message.verb == Verb.MUTATION || message.verb == 
Verb.COUNTER_MUTATION;
         int messageId = nextId();
 
         CallbackInfo previous = callbacks.put(messageId,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java 
b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
index 1c21e41..7326fa9 100644
--- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
+++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
@@ -307,9 +307,7 @@ public final class LegacySchemaMigrator
                                     defaultValidator);
         }
 
-        // The legacy schema did not have views, so we know that we are not 
loading a materialized view
-        boolean isMaterializedView = false;
-        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, 
isCompound, isSuper, isCounter, isMaterializedView, columnDefs);
+        CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, 
isCompound, isSuper, isCounter, columnDefs);
 
         cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
         
cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/MaterializedViews.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java 
b/src/java/org/apache/cassandra/schema/MaterializedViews.java
deleted file mode 100644
index 1c55736..0000000
--- a/src/java/org/apache/cassandra/schema/MaterializedViews.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.schema;
-
-
-import java.util.Iterator;
-import java.util.Optional;
-
-import com.google.common.collect.ImmutableMap;
-
-import org.apache.cassandra.config.MaterializedViewDefinition;
-
-import static com.google.common.collect.Iterables.filter;
-
-public final class MaterializedViews implements 
Iterable<MaterializedViewDefinition>
-{
-    private final ImmutableMap<String, MaterializedViewDefinition> 
materializedViews;
-
-    private MaterializedViews(Builder builder)
-    {
-        materializedViews = builder.materializedViews.build();
-    }
-
-    public static Builder builder()
-    {
-        return new Builder();
-    }
-
-    public static MaterializedViews none()
-    {
-        return builder().build();
-    }
-
-    public Iterator<MaterializedViewDefinition> iterator()
-    {
-        return materializedViews.values().iterator();
-    }
-
-    public int size()
-    {
-        return materializedViews.size();
-    }
-
-    public boolean isEmpty()
-    {
-        return materializedViews.isEmpty();
-    }
-
-    /**
-     * Get the materialized view with the specified name
-     *
-     * @param name a non-qualified materialized view name
-     * @return an empty {@link Optional} if the materialized view name is not 
found; a non-empty optional of {@link MaterializedViewDefinition} otherwise
-     */
-    public Optional<MaterializedViewDefinition> get(String name)
-    {
-        return Optional.ofNullable(materializedViews.get(name));
-    }
-
-    /**
-     * Create a MaterializedViews instance with the provided materialized view 
added
-     */
-    public MaterializedViews with(MaterializedViewDefinition materializedView)
-    {
-        if (get(materializedView.viewName).isPresent())
-            throw new IllegalStateException(String.format("Materialized View 
%s already exists", materializedView.viewName));
-
-        return builder().add(this).add(materializedView).build();
-    }
-
-    /**
-     * Creates a MaterializedViews instance with the materializedView with the 
provided name removed
-     */
-    public MaterializedViews without(String name)
-    {
-        MaterializedViewDefinition materializedView =
-        get(name).orElseThrow(() -> new 
IllegalStateException(String.format("Materialized View %s doesn't exists", 
name)));
-
-        return builder().add(filter(this, v -> v != materializedView)).build();
-    }
-
-    /**
-     * Creates a MaterializedViews instance which contains an updated 
materialized view
-     */
-    public MaterializedViews replace(MaterializedViewDefinition 
materializedView)
-    {
-        return without(materializedView.viewName).with(materializedView);
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-        return this == o || (o instanceof MaterializedViews && 
materializedViews.equals(((MaterializedViews) o).materializedViews));
-    }
-
-    @Override
-    public int hashCode()
-    {
-        return materializedViews.hashCode();
-    }
-
-    @Override
-    public String toString()
-    {
-        return materializedViews.values().toString();
-    }
-
-    public static final class Builder
-    {
-        final ImmutableMap.Builder<String, MaterializedViewDefinition> 
materializedViews = new ImmutableMap.Builder<>();
-
-        private Builder()
-        {
-        }
-
-        public MaterializedViews build()
-        {
-            return new MaterializedViews(this);
-        }
-
-        public Builder add(MaterializedViewDefinition materializedView)
-        {
-            materializedViews.put(materializedView.viewName, materializedView);
-            return this;
-        }
-
-        public Builder add(Iterable<MaterializedViewDefinition> 
materializedViews)
-        {
-            materializedViews.forEach(this::add);
-            return this;
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java 
b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
index 2150f4a..2bc7b0c 100644
--- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
+++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java
@@ -76,13 +76,12 @@ public final class SchemaKeyspace
     public static final String COLUMNS = "columns";
     public static final String DROPPED_COLUMNS = "dropped_columns";
     public static final String TRIGGERS = "triggers";
-    public static final String MATERIALIZED_VIEWS = "materialized_views";
     public static final String TYPES = "types";
     public static final String FUNCTIONS = "functions";
     public static final String AGGREGATES = "aggregates";
 
     public static final List<String> ALL =
-        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, 
MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES);
+        ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, TYPES, 
FUNCTIONS, AGGREGATES);
 
     private static final CFMetaData Keyspaces =
         compile(KEYSPACES,
@@ -153,18 +152,6 @@ public final class SchemaKeyspace
                 + "trigger_options map<text, text>,"
                 + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))");
 
-    private static final CFMetaData MaterializedViews =
-        compile(MATERIALIZED_VIEWS,
-                "materialized views definitions",
-                "CREATE TABLE %s ("
-                + "keyspace_name text,"
-                + "table_name text,"
-                + "view_name text,"
-                + "target_columns list<text>,"
-                + "clustering_columns list<text>,"
-                + "included_columns list<text>,"
-                + "PRIMARY KEY ((keyspace_name), table_name, view_name))");
-
     private static final CFMetaData Types =
         compile(TYPES,
                 "user defined type definitions",
@@ -206,7 +193,7 @@ public final class SchemaKeyspace
                 + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))");
 
     public static final List<CFMetaData> All =
-        ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, 
MaterializedViews, Types, Functions, Aggregates);
+        ImmutableList.of(Keyspaces, Tables, Columns, DroppedColumns, Triggers, 
Types, Functions, Aggregates);
 
     private static CFMetaData compile(String name, String description, String 
schema)
     {
@@ -701,8 +688,6 @@ public final class SchemaKeyspace
         Mutation mutation = new Mutation(NAME, 
getSchemaKSDecoratedKey(keyspace.name));
         for (CFMetaData schemaTable : All)
             mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, 
mutation.key(), timestamp, nowInSec));
-        
mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltMaterializedViews,
 mutation.key(), timestamp, nowInSec));
-        
mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.MaterializedViewsBuildsInProgress,
 mutation.key(), timestamp, nowInSec));
         return mutation;
     }
 
@@ -845,9 +830,6 @@ public final class SchemaKeyspace
 
             for (TriggerMetadata trigger : table.getTriggers())
                 addTriggerToSchemaMutation(table, trigger, timestamp, 
mutation);
-
-            for (MaterializedViewDefinition materializedView: 
table.getMaterializedViews())
-                addMaterializedViewToSchemaMutation(table, materializedView, 
timestamp, mutation);
         }
     }
 
@@ -941,22 +923,6 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : 
triggerDiff.entriesOnlyOnRight().values())
             addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation);
 
-        MapDifference<String, MaterializedViewDefinition> materializedViewDiff 
= materializedViewsDiff(oldTable.getMaterializedViews(), 
newTable.getMaterializedViews());
-
-        // dropped materialized views
-        for (MaterializedViewDefinition materializedView : 
materializedViewDiff.entriesOnlyOnLeft().values())
-            dropMaterializedViewFromSchemaMutation(oldTable, materializedView, 
timestamp, mutation);
-
-        // newly created materialized views
-        for (MaterializedViewDefinition materializedView : 
materializedViewDiff.entriesOnlyOnRight().values())
-            addMaterializedViewToSchemaMutation(oldTable, materializedView, 
timestamp, mutation);
-
-        // updated materialized views need to be updated
-        for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : 
materializedViewDiff.entriesDiffering().values())
-        {
-            addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, 
diff.rightValue(), timestamp, mutation);
-        }
-
         return mutation;
     }
 
@@ -971,17 +937,6 @@ public final class SchemaKeyspace
         return Maps.difference(beforeMap, afterMap);
     }
 
-    private static MapDifference<String, MaterializedViewDefinition> 
materializedViewsDiff(MaterializedViews before, MaterializedViews after)
-    {
-        Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>();
-        before.forEach(v -> beforeMap.put(v.viewName, v));
-
-        Map<String, MaterializedViewDefinition> afterMap = new HashMap<>();
-        after.forEach(v -> afterMap.put(v.viewName, v));
-
-        return Maps.difference(beforeMap, afterMap);
-    }
-
     public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, 
CFMetaData table, long timestamp)
     {
         // Include the serialized keyspace in case the target node missed a 
CREATE KEYSPACE migration (see CASSANDRA-5631).
@@ -995,9 +950,6 @@ public final class SchemaKeyspace
         for (TriggerMetadata trigger : table.getTriggers())
             dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation);
 
-        for (MaterializedViewDefinition materializedView : 
table.getMaterializedViews())
-            dropMaterializedViewFromSchemaMutation(table, materializedView, 
timestamp, mutation);
-
         return mutation;
     }
 
@@ -1062,12 +1014,8 @@ public final class SchemaKeyspace
         Triggers triggers =
             readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, 
SchemaKeyspace::createTriggersFromTriggersPartition);
 
-        MaterializedViews views =
-            readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, 
table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition);
-
         return createTableFromTableRowAndColumns(row, 
columns).droppedColumns(droppedColumns)
-                                                              
.triggers(triggers)
-                                                              
.materializedViews(views);
+                                                              
.triggers(triggers);
     }
 
     public static CFMetaData 
createTableFromTableRowAndColumns(UntypedResultSet.Row row, 
List<ColumnDefinition> columns)
@@ -1084,9 +1032,8 @@ public final class SchemaKeyspace
         boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER);
         boolean isDense = flags.contains(CFMetaData.Flag.DENSE);
         boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
-        boolean isMaterializedView = 
flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);
 
-        CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, 
isCompound, isSuper, isCounter, isMaterializedView, columns);
+        CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, 
isCompound, isSuper, isCounter, columns);
 
         Map<String, String> compaction = new 
HashMap<>(row.getTextMap("compaction"));
         Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
@@ -1274,103 +1221,6 @@ public final class SchemaKeyspace
     }
 
     /*
-     * Global Index metadata serialization/deserialization.
-     */
-
-    private static void addMaterializedViewToSchemaMutation(CFMetaData table, 
MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, 
timestamp, mutation)
-            .clustering(table.cfName, materializedView.viewName);
-
-        for (ColumnIdentifier partitionColumn : 
materializedView.partitionColumns)
-            builder.addListEntry("target_columns", partitionColumn.toString());
-        for (ColumnIdentifier clusteringColumn : 
materializedView.clusteringColumns)
-            builder = builder.addListEntry("clustering_columns", 
clusteringColumn.toString());
-        for (ColumnIdentifier includedColumn : materializedView.included)
-            builder = builder.addListEntry("included_columns", 
includedColumn.toString());
-
-        builder.build();
-    }
-
-    private static void dropMaterializedViewFromSchemaMutation(CFMetaData 
table, MaterializedViewDefinition materializedView, long timestamp, Mutation 
mutation)
-    {
-        RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, 
table.cfName, materializedView.viewName);
-    }
-
-    private static void 
addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, 
MaterializedViewDefinition materializedView, long timestamp, Mutation mutation)
-    {
-        RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, 
timestamp, mutation)
-                                   .clustering(table.cfName, 
materializedView.viewName);
-
-        builder.resetCollection("target_columns");
-        for (ColumnIdentifier partitionColumn : 
materializedView.partitionColumns)
-            builder.addListEntry("target_columns", partitionColumn.toString());
-
-        builder.resetCollection("clustering_columns");
-        for (ColumnIdentifier clusteringColumn : 
materializedView.clusteringColumns)
-            builder = builder.addListEntry("clustering_columns", 
clusteringColumn.toString());
-
-        builder.resetCollection("included_columns");
-        for (ColumnIdentifier includedColumn : materializedView.included)
-            builder = builder.addListEntry("included_columns", 
includedColumn.toString());
-
-        builder.build();
-    }
-
-    /**
-     * Deserialize materialized views from storage-level representation.
-     *
-     * @param partition storage-level partition containing the materialized 
view definitions
-     * @return the list of processed MaterializedViewDefinitions
-     */
-    private static MaterializedViews 
createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition)
-    {
-        MaterializedViews.Builder views = 
org.apache.cassandra.schema.MaterializedViews.builder();
-        String query = String.format("SELECT * FROM %s.%s", NAME, 
MATERIALIZED_VIEWS);
-        for (UntypedResultSet.Row row : QueryProcessor.resultify(query, 
partition))
-        {
-            MaterializedViewDefinition mv = 
createMaterializedViewFromMaterializedViewRow(row);
-            views.add(mv);
-        }
-        return views.build();
-    }
-
-    private static MaterializedViewDefinition 
createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row)
-    {
-        String name = row.getString("view_name");
-        List<String> partitionColumnNames = row.getList("target_columns", 
UTF8Type.instance);
-
-        String cfName = row.getString("columnfamily_name");
-        List<String> clusteringColumnNames = row.getList("clustering_columns", 
UTF8Type.instance);
-
-        List<ColumnIdentifier> partitionColumns = new ArrayList<>();
-        for (String columnName : partitionColumnNames)
-        {
-            partitionColumns.add(ColumnIdentifier.getInterned(columnName, 
true));
-        }
-
-        List<ColumnIdentifier> clusteringColumns = new ArrayList<>();
-        for (String columnName : clusteringColumnNames)
-        {
-            clusteringColumns.add(ColumnIdentifier.getInterned(columnName, 
true));
-        }
-
-        List<String> includedColumnNames = row.getList("included_columns", 
UTF8Type.instance);
-        Set<ColumnIdentifier> includedColumns = new HashSet<>();
-        if (includedColumnNames != null)
-        {
-            for (String columnName : includedColumnNames)
-                includedColumns.add(ColumnIdentifier.getInterned(columnName, 
true));
-        }
-
-        return new MaterializedViewDefinition(cfName,
-                                              name,
-                                              partitionColumns,
-                                              clusteringColumns,
-                                              includedColumns);
-    }
-
-    /*
      * UDF metadata serialization/deserialization.
      */
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java 
b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
index 9a57f45..6696e10 100644
--- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
+++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java
@@ -196,7 +196,7 @@ public abstract class AbstractReadExecutor
             return new SpeculatingReadExecutor(keyspace, cfs, command, 
consistencyLevel, targetReplicas);
     }
 
-    public static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
+    private static class NeverSpeculatingReadExecutor extends 
AbstractReadExecutor
     {
         public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand 
command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas)
         {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
index e3ba66e..8978034 100644
--- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java
@@ -47,7 +47,7 @@ public abstract class AbstractWriteResponseHandler<T> 
implements IAsyncCallbackW
     public final ConsistencyLevel consistencyLevel;
     protected final Runnable callback;
     protected final Collection<InetAddress> pendingEndpoints;
-    protected final WriteType writeType;
+    private final WriteType writeType;
     private static final 
AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater
         = 
AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, 
"failures");
     private volatile int failures = 0;

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java 
b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
deleted file mode 100644
index ac44923..0000000
--- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service;
-
-import java.net.InetAddress;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-import org.apache.cassandra.exceptions.WriteFailureException;
-import org.apache.cassandra.exceptions.WriteTimeoutException;
-import org.apache.cassandra.net.MessageIn;
-
-public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T>
-{
-    AbstractWriteResponseHandler<T> wrapped;
-    BatchlogCleanup cleanup;
-    protected volatile int requiredBeforeFinish;
-    private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> 
requiredBeforeFinishUpdater
-            = 
AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, 
"requiredBeforeFinish");
-
-    public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, 
int requiredBeforeFinish, BatchlogCleanup cleanup)
-    {
-        super(wrapped.keyspace, wrapped.naturalEndpoints, 
wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, 
wrapped.writeType);
-        this.wrapped = wrapped;
-        this.requiredBeforeFinish = requiredBeforeFinish;
-        this.cleanup = cleanup;
-    }
-
-    protected int ackCount()
-    {
-        return wrapped.ackCount();
-    }
-
-    public void response(MessageIn<T> msg)
-    {
-        wrapped.response(msg);
-        if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0)
-            cleanup.run();
-    }
-
-    public boolean isLatencyForSnitch()
-    {
-        return wrapped.isLatencyForSnitch();
-    }
-
-    public void onFailure(InetAddress from)
-    {
-        wrapped.onFailure(from);
-    }
-
-    public void assureSufficientLiveNodes()
-    {
-        wrapped.assureSufficientLiveNodes();
-    }
-
-    public void get() throws WriteTimeoutException, WriteFailureException
-    {
-        wrapped.get();
-    }
-
-    protected int totalBlockFor()
-    {
-        return wrapped.totalBlockFor();
-    }
-
-    protected int totalEndpoints()
-    {
-        return wrapped.totalEndpoints();
-    }
-
-    protected boolean waitingFor(InetAddress from)
-    {
-        return wrapped.waitingFor(from);
-    }
-
-    protected void signal()
-    {
-        wrapped.signal();
-    }
-
-    public static class BatchlogCleanup
-    {
-        private final BatchlogCleanupCallback callback;
-
-        protected volatile int mutationsWaitingFor;
-        private static final AtomicIntegerFieldUpdater<BatchlogCleanup> 
mutationsWaitingForUpdater
-            = AtomicIntegerFieldUpdater.newUpdater(BatchlogCleanup.class, 
"mutationsWaitingFor");
-
-        public BatchlogCleanup(int mutationsWaitingFor, 
BatchlogCleanupCallback callback)
-        {
-            this.mutationsWaitingFor = mutationsWaitingFor;
-            this.callback = callback;
-        }
-
-        public void run()
-        {
-            if (mutationsWaitingForUpdater.decrementAndGet(this) == 0)
-                callback.invoke();
-        }
-    }
-
-    public interface BatchlogCleanupCallback
-    {
-        void invoke();
-    }
-}

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/CassandraDaemon.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index fddf593..548cbc7 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -287,24 +287,6 @@ public class CassandraDaemon
             }
         }
 
-        Runnable indexRebuild = new Runnable()
-        {
-            @Override
-            public void run()
-            {
-                for (Keyspace keyspace : Keyspace.all())
-                {
-                    for (ColumnFamilyStore cf: 
keyspace.getColumnFamilyStores())
-                    {
-                        cf.materializedViewManager.buildAllViews();
-                    }
-                }
-            }
-        };
-
-        ScheduledExecutors.optionalTasks.schedule(indexRebuild, 
StorageService.RING_DELAY, TimeUnit.MILLISECONDS);
-
-
         SystemKeyspace.finishStartup();
 
         // start server internals

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java 
b/src/java/org/apache/cassandra/service/StorageProxy.java
index be11c77..2c5a2ab 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,10 +31,6 @@ import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
 import com.google.common.util.concurrent.Uninterruptibles;
-
-import org.apache.cassandra.db.view.MaterializedViewManager;
-import org.apache.cassandra.db.view.MaterializedViewUtils;
-import org.apache.cassandra.metrics.*;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +58,7 @@ import 
org.apache.cassandra.locator.AbstractReplicationStrategy;
 import org.apache.cassandra.locator.IEndpointSnitch;
 import org.apache.cassandra.locator.LocalStrategy;
 import org.apache.cassandra.locator.TokenMetadata;
+import org.apache.cassandra.metrics.*;
 import org.apache.cassandra.net.*;
 import org.apache.cassandra.service.paxos.*;
 import org.apache.cassandra.tracing.Tracing;
@@ -94,7 +91,6 @@ public class StorageProxy implements StorageProxyMBean
     private static final ClientRequestMetrics writeMetrics = new 
ClientRequestMetrics("Write");
     private static final CASClientRequestMetrics casWriteMetrics = new 
CASClientRequestMetrics("CASWrite");
     private static final CASClientRequestMetrics casReadMetrics = new 
CASClientRequestMetrics("CASRead");
-    private static final MVWriteMetrics mvWriteMetrics = new 
MVWriteMetrics("MVWrite");
 
     private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10;
 
@@ -122,7 +118,7 @@ public class StorageProxy implements StorageProxyMBean
             throws OverloadedException
             {
                 assert mutation instanceof Mutation;
-                sendToHintedEndpoints((Mutation) mutation, targets, 
responseHandler, localDataCenter, Stage.MUTATION);
+                sendToHintedEndpoints((Mutation) mutation, targets, 
responseHandler, localDataCenter);
             }
         };
 
@@ -626,80 +622,6 @@ public class StorageProxy implements StorageProxyMBean
         Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas 
acknowledged the write");
     }
 
-    /**
-     * Use this method to have these Mutations applied
-     * across all replicas.
-     *
-     * @param mutations the mutations to be applied across the replicas
-     */
-    public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> 
mutations)
-    throws UnavailableException, OverloadedException, WriteTimeoutException
-    {
-        Tracing.trace("Determining replicas for mutation");
-        final String localDataCenter = 
DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress());
-
-        long startTime = System.nanoTime();
-        List<WriteResponseHandlerWrapper> wrappers = new 
ArrayList<>(mutations.size());
-
-        try
-        {
-            Token baseToken = 
StorageService.getPartitioner().getToken(dataKey);
-
-            ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE;
-
-            //Since the base -> view replication is 1:1 we only need to store 
the BL locally
-            final Collection<InetAddress> batchlogEndpoints = 
Collections.singleton(FBUtilities.getBroadcastAddress());
-            final UUID batchUUID = UUIDGen.getTimeUUID();
-            BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                               
                           () -> asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID));
-
-            // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
-            for (Mutation mutation : mutations)
-            {
-                String keyspaceName = mutation.getKeyspaceName();
-                Token tk = mutation.key().getToken();
-                List<InetAddress> naturalEndpoints = 
Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, 
baseToken, tk));
-
-                WriteResponseHandlerWrapper wrapper = 
wrapMVBatchResponseHandler(mutation,
-                                                                               
  consistencyLevel,
-                                                                               
  consistencyLevel,
-                                                                               
  naturalEndpoints,
-                                                                               
  WriteType.BATCH,
-                                                                               
  cleanup);
-
-                wrappers.add(wrapper);
-
-                //Apply to local batchlog memtable in this thread
-                BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, 
MessagingService.current_version).apply();
-            }
-
-            // now actually perform the writes and wait for them to complete
-            asyncWriteBatchedMutations(wrappers, localDataCenter, 
Stage.MATERIALIZED_VIEW_MUTATION);
-        }
-        catch (WriteTimeoutException ex)
-        {
-            mvWriteMetrics.timeouts.mark();
-            Tracing.trace("Write timeout; received {} of {} required replies", 
ex.received, ex.blockFor);
-            throw ex;
-        }
-        catch (UnavailableException e)
-        {
-            mvWriteMetrics.unavailables.mark();
-            Tracing.trace("Unavailable");
-            throw e;
-        }
-        catch (OverloadedException e)
-        {
-            mvWriteMetrics.unavailables.mark();
-            Tracing.trace("Overloaded");
-            throw e;
-        }
-        finally
-        {
-            mvWriteMetrics.addNano(System.nanoTime() - startTime);
-        }
-    }
-
     @SuppressWarnings("unchecked")
     public static void mutateWithTriggers(Collection<? extends IMutation> 
mutations,
                                           ConsistencyLevel consistencyLevel,
@@ -708,17 +630,12 @@ public class StorageProxy implements StorageProxyMBean
     {
         Collection<Mutation> augmented = 
TriggerExecutor.instance.execute(mutations);
 
-        boolean updatesView = 
MaterializedViewManager.updatesAffectView(mutations, true);
-
         if (augmented != null)
-            mutateAtomically(augmented, consistencyLevel, updatesView);
+            mutateAtomically(augmented, consistencyLevel);
+        else if (mutateAtomically)
+            mutateAtomically((Collection<Mutation>) mutations, 
consistencyLevel);
         else
-        {
-            if (mutateAtomically || updatesView)
-                mutateAtomically((Collection<Mutation>) mutations, 
consistencyLevel, updatesView);
-            else
-                mutate(mutations, consistencyLevel);
-        }
+            mutate(mutations, consistencyLevel);
     }
 
     /**
@@ -729,11 +646,8 @@ public class StorageProxy implements StorageProxyMBean
      *
      * @param mutations the Mutations to be applied across the replicas
      * @param consistency_level the consistency level for the operation
-     * @param requireQuorumForRemove at least a quorum of nodes will see 
update before deleting batchlog
      */
-    public static void mutateAtomically(Collection<Mutation> mutations,
-                                        ConsistencyLevel consistency_level,
-                                        boolean requireQuorumForRemove)
+    public static void mutateAtomically(Collection<Mutation> mutations, 
ConsistencyLevel consistency_level)
     throws UnavailableException, OverloadedException, WriteTimeoutException
     {
         Tracing.trace("Determining replicas for atomic batch");
@@ -744,49 +658,25 @@ public class StorageProxy implements StorageProxyMBean
 
         try
         {
-
-            // If we are requiring quorum nodes for removal, we upgrade 
consistency level to QUORUM unless we already
-            // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM 
nodes see the update.
-            ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove
-                                                     ? ConsistencyLevel.QUORUM
-                                                     : consistency_level;
-
-            switch (consistency_level)
-            {
-                case ALL:
-                case EACH_QUORUM:
-                    batchConsistencyLevel = consistency_level;
-            }
-
-            final Collection<InetAddress> batchlogEndpoints = 
getBatchlogEndpoints(localDataCenter, batchConsistencyLevel);
-            final UUID batchUUID = UUIDGen.getTimeUUID();
-            BatchlogResponseHandler.BatchlogCleanup cleanup = new 
BatchlogResponseHandler.BatchlogCleanup(mutations.size(),
-                                                                               
                           new BatchlogResponseHandler.BatchlogCleanupCallback()
-                                                                               
                           {
-                                                                               
                               public void invoke()
-                                                                               
                               {
-                                                                               
                                   asyncRemoveFromBatchlog(batchlogEndpoints, 
batchUUID);
-                                                                               
                               }
-                                                                               
                           });
-
             // add a handler for each mutation - includes checking 
availability, but doesn't initiate any writes, yet
             for (Mutation mutation : mutations)
             {
-                WriteResponseHandlerWrapper wrapper = 
wrapBatchResponseHandler(mutation,
-                                                                               
consistency_level,
-                                                                               
batchConsistencyLevel,
-                                                                               
WriteType.BATCH,
-                                                                               
cleanup);
+                WriteResponseHandlerWrapper wrapper = 
wrapResponseHandler(mutation, consistency_level, WriteType.BATCH);
                 // exit early if we can't fulfill the CL at this time.
                 wrapper.handler.assureSufficientLiveNodes();
                 wrappers.add(wrapper);
             }
 
             // write to the batchlog
+            Collection<InetAddress> batchlogEndpoints = 
getBatchlogEndpoints(localDataCenter, consistency_level);
+            UUID batchUUID = UUIDGen.getTimeUUID();
             syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID);
 
             // now actually perform the writes and wait for them to complete
-            syncWriteBatchedMutations(wrappers, localDataCenter, 
Stage.MUTATION);
+            syncWriteBatchedMutations(wrappers, localDataCenter);
+
+            // remove the batchlog entries asynchronously
+            asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID);
         }
         catch (UnavailableException e)
         {
@@ -829,13 +719,13 @@ public class StorageProxy implements StorageProxyMBean
                                                                         
WriteType.BATCH_LOG);
 
         MessageOut<Mutation> message = 
BatchlogManager.getBatchlogMutationFor(mutations, uuid, 
MessagingService.current_version)
-                                                      
.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
+                                                      .createMessage();
         for (InetAddress target : endpoints)
         {
             int targetVersion = MessagingService.instance().getVersion(target);
             if (canDoLocalRequest(target))
             {
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
+                insertLocal(message.payload, handler);
             }
             else if (targetVersion == MessagingService.current_version)
             {
@@ -844,7 +734,7 @@ public class StorageProxy implements StorageProxyMBean
             else
             {
                 
MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations,
 uuid, targetVersion)
-                                                                  
.createMessage(MessagingService.Verb.BATCHLOG_MUTATION),
+                                                                  
.createMessage(),
                                                    target,
                                                    handler,
                                                    false);
@@ -864,43 +754,25 @@ public class StorageProxy implements StorageProxyMBean
                                                                         
WriteType.SIMPLE);
         Mutation mutation = new Mutation(SystemKeyspace.NAME, 
StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid)));
         
mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, 
mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds()));
-        MessageOut<Mutation> message = 
mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION);
+        MessageOut<Mutation> message = mutation.createMessage();
         for (InetAddress target : endpoints)
         {
             if (canDoLocalRequest(target))
-                insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler);
+                insertLocal(message.payload, handler);
             else
                 MessagingService.instance().sendRR(message, target, handler, 
false);
         }
     }
 
-    private static void 
asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String 
localDataCenter, Stage stage)
-    {
-        for (WriteResponseHandlerWrapper wrapper : wrappers)
-        {
-            Iterable<InetAddress> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
-
-            try
-            {
-                sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter, stage);
-            }
-            catch (OverloadedException | WriteTimeoutException e)
-            {
-                wrapper.handler.onFailure(FBUtilities.getBroadcastAddress());
-            }
-        }
-    }
-
-    private static void 
syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String 
localDataCenter, Stage stage)
+    private static void 
syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String 
localDataCenter)
     throws WriteTimeoutException, OverloadedException
     {
         for (WriteResponseHandlerWrapper wrapper : wrappers)
         {
             Iterable<InetAddress> endpoints = 
Iterables.concat(wrapper.handler.naturalEndpoints, 
wrapper.handler.pendingEndpoints);
-            sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter, stage);
+            sendToHintedEndpoints(wrapper.mutation, endpoints, 
wrapper.handler, localDataCenter);
         }
 
-
         for (WriteResponseHandlerWrapper wrapper : wrappers)
             wrapper.handler.get();
     }
@@ -943,52 +815,25 @@ public class StorageProxy implements StorageProxyMBean
         return responseHandler;
     }
 
-    // same as performWrites except does not initiate writes (but does perform 
availability checks).
-    private static WriteResponseHandlerWrapper 
wrapBatchResponseHandler(Mutation mutation,
-                                                                        
ConsistencyLevel consistency_level,
-                                                                        
ConsistencyLevel batchConsistencyLevel,
-                                                                        
WriteType writeType,
-                                                                        
BatchlogResponseHandler.BatchlogCleanup cleanup)
+    // same as above except does not initiate writes (but does perform 
availability checks).
+    private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation 
mutation, ConsistencyLevel consistency_level, WriteType writeType)
     {
-        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
+        AbstractReplicationStrategy rs = 
Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy();
         String keyspaceName = mutation.getKeyspaceName();
         Token tk = mutation.key().getToken();
         List<InetAddress> naturalEndpoints = 
StorageService.instance.getNaturalEndpoints(keyspaceName, tk);
         Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, null, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new 
BatchlogResponseHandler<>(writeHandler, 
batchConsistencyLevel.blockFor(keyspace), cleanup);
-        return new WriteResponseHandlerWrapper(batchHandler, mutation);
-    }
-
-    /**
-     * Same as performWrites except does not initiate writes (but does perform 
availability checks).
-     * Keeps track of MVWriteMetrics
-     */
-    private static WriteResponseHandlerWrapper 
wrapMVBatchResponseHandler(Mutation mutation,
-                                                                          
ConsistencyLevel consistency_level,
-                                                                          
ConsistencyLevel batchConsistencyLevel,
-                                                                          
List<InetAddress> naturalEndpoints,
-                                                                          
WriteType writeType,
-                                                                          
BatchlogResponseHandler.BatchlogCleanup cleanup)
-    {
-        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
-        AbstractReplicationStrategy rs = keyspace.getReplicationStrategy();
-        String keyspaceName = mutation.getKeyspaceName();
-        Token tk = mutation.key().getToken();
-        Collection<InetAddress> pendingEndpoints = 
StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, 
keyspaceName);
-        AbstractWriteResponseHandler<IMutation> writeHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, null, writeType);
-        BatchlogResponseHandler<IMutation> batchHandler = new 
MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), 
cleanup);
-        return new WriteResponseHandlerWrapper(batchHandler, mutation);
+        AbstractWriteResponseHandler<IMutation> responseHandler = 
rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, 
consistency_level, null, writeType);
+        return new WriteResponseHandlerWrapper(responseHandler, mutation);
     }
 
     // used by atomic_batch_mutate to decouple availability check from the 
write itself, caches consistency level and endpoints.
     private static class WriteResponseHandlerWrapper
     {
-        final BatchlogResponseHandler<IMutation> handler;
+        final AbstractWriteResponseHandler<IMutation> handler;
         final Mutation mutation;
 
-        WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> 
handler, Mutation mutation)
+        WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> 
handler, Mutation mutation)
         {
             this.handler = handler;
             this.mutation = mutation;
@@ -1041,8 +886,7 @@ public class StorageProxy implements StorageProxyMBean
     public static void sendToHintedEndpoints(final Mutation mutation,
                                              Iterable<InetAddress> targets,
                                              
AbstractWriteResponseHandler<IMutation> responseHandler,
-                                             String localDataCenter,
-                                             Stage stage)
+                                             String localDataCenter)
     throws OverloadedException
     {
         // extra-datacenter replicas, grouped by dc
@@ -1106,7 +950,7 @@ public class StorageProxy implements StorageProxyMBean
         }
 
         if (insertLocal)
-            insertLocal(stage, mutation, responseHandler);
+            insertLocal(mutation, responseHandler);
 
         if (dcGroups != null)
         {
@@ -1215,9 +1059,10 @@ public class StorageProxy implements StorageProxyMBean
         }
     }
 
-    private static void insertLocal(Stage stage, final Mutation mutation, 
final AbstractWriteResponseHandler<IMutation> responseHandler)
+    private static void insertLocal(final Mutation mutation, final 
AbstractWriteResponseHandler<IMutation> responseHandler)
     {
-        StageManager.getStage(stage).maybeExecuteImmediately(new 
LocalMutationRunnable()
+
+        StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new 
LocalMutationRunnable()
         {
             public void runMayThrow()
             {
@@ -1228,8 +1073,7 @@ public class StorageProxy implements StorageProxyMBean
                 }
                 catch (Exception ex)
                 {
-                    if (!(ex instanceof WriteTimeoutException))
-                        logger.error("Failed to apply mutation locally : {}", 
ex);
+                    logger.error("Failed to apply mutation locally : {}", ex);
                     
responseHandler.onFailure(FBUtilities.getBroadcastAddress());
                 }
             }
@@ -1349,7 +1193,7 @@ public class StorageProxy implements StorageProxyMBean
                 Set<InetAddress> remotes = 
Sets.difference(ImmutableSet.copyOf(targets),
                                                            
ImmutableSet.of(FBUtilities.getBroadcastAddress()));
                 if (!remotes.isEmpty())
-                    sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter, Stage.COUNTER_MUTATION);
+                    sendToHintedEndpoints(result, remotes, responseHandler, 
localDataCenter);
             }
         };
     }
@@ -2279,24 +2123,6 @@ public class StorageProxy implements StorageProxyMBean
     }
 
     /**
-     * This class captures metrics for materialized views writes.
-     */
-    private static class MVWriteMetricsWrapped extends 
BatchlogResponseHandler<IMutation>
-    {
-        public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> 
writeHandler, int i, BatchlogCleanup cleanup)
-        {
-            super(writeHandler, i, cleanup);
-            mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints());
-        }
-
-        public void response(MessageIn<IMutation> msg)
-        {
-            super.response(msg);
-            mvWriteMetrics.viewReplicasSuccess.inc();
-        }
-    }
-
-    /**
      * A Runnable that aborts if it doesn't start running before it times out
      */
     private static abstract class DroppableRunnable implements Runnable

http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java 
b/src/java/org/apache/cassandra/service/StorageService.java
index 5049337..da53bf7 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -297,8 +297,6 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
 
         /* register the verb handlers */
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION,
 new MutationVerbHandler());
-        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION,
 new MutationVerbHandler());
-        
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MATERIALIZED_VIEW_MUTATION,
 new MutationVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR,
 new ReadRepairVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, 
new ReadCommandVerbHandler());
         
MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE,
 new ReadCommandVerbHandler());
@@ -631,14 +629,9 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
             public void runMayThrow() throws InterruptedException
             {
                 inShutdownHook = true;
-                ExecutorService materializedViewMutationStage = 
StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
-                ExecutorService batchlogMutationStage = 
StageManager.getStage(Stage.BATCHLOG_MUTATION);
                 ExecutorService counterMutationStage = 
StageManager.getStage(Stage.COUNTER_MUTATION);
                 ExecutorService mutationStage = 
StageManager.getStage(Stage.MUTATION);
-                if (mutationStage.isShutdown()
-                    && counterMutationStage.isShutdown()
-                    && batchlogMutationStage.isShutdown()
-                    && materializedViewMutationStage.isShutdown())
+                if (mutationStage.isShutdown() && 
counterMutationStage.isShutdown())
                     return; // drained already
 
                 if (daemon != null)
@@ -649,12 +642,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
                 // In-progress writes originating here could generate hints to 
be written, so shut down MessagingService
                 // before mutation stage, so we can get all the hints saved 
before shutting down
                 MessagingService.instance().shutdown();
-                materializedViewMutationStage.shutdown();
-                batchlogMutationStage.shutdown();
                 counterMutationStage.shutdown();
                 mutationStage.shutdown();
-                materializedViewMutationStage.awaitTermination(3600, 
TimeUnit.SECONDS);
-                batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
                 StorageProxy.instance.verifyNoHintsInProgress();
@@ -3831,13 +3820,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         inShutdownHook = true;
         
         ExecutorService counterMutationStage = 
StageManager.getStage(Stage.COUNTER_MUTATION);
-        ExecutorService batchlogMutationStage = 
StageManager.getStage(Stage.BATCHLOG_MUTATION);
-        ExecutorService materializedViewMutationStage = 
StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION);
         ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION);
-        if (mutationStage.isTerminated()
-            && counterMutationStage.isTerminated()
-            && batchlogMutationStage.isTerminated()
-            && materializedViewMutationStage.isTerminated())
+        if (mutationStage.isTerminated() && 
counterMutationStage.isTerminated())
         {
             logger.warn("Cannot drain node (did it already happen?)");
             return;
@@ -3851,12 +3835,8 @@ public class StorageService extends 
NotificationBroadcasterSupport implements IE
         MessagingService.instance().shutdown();
 
         setMode(Mode.DRAINING, "clearing mutation stage", false);
-        materializedViewMutationStage.shutdown();
-        batchlogMutationStage.shutdown();
         counterMutationStage.shutdown();
         mutationStage.shutdown();
-        materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
-        batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
         mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
 

Reply via email to