http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java deleted file mode 100644 index 4dfea75..0000000 --- a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.view; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.List; - -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.Token; -import org.apache.cassandra.locator.AbstractReplicationStrategy; -import org.apache.cassandra.utils.FBUtilities; - -public final class MaterializedViewUtils -{ - private MaterializedViewUtils() - { - } - - /** - * Calculate the natural endpoint for the view. - * - * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor. - * The cardinality is the number at which this node would store a piece of data, given the change in replication - * factor. - * - * For example, if we have the following ring: - * A, T1 -> B, T2 -> C, T3 -> A - * - * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would - * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality - * for T3 is 2. - * - * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be: - * A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3) - * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3) - * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3) - * - * @throws RuntimeException if this method is called using a base token which does not belong to this replica - */ - public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) - { - AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); - - String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - List<InetAddress> localBaseEndpoints = new ArrayList<>(); - List<InetAddress> localViewEndpoints = new ArrayList<>(); - for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) - { - if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter)) - localBaseEndpoints.add(baseEndpoint); - } - - for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) - { - // If we are a base endpoint which is also a view replica, we use ourselves as our view replica - if (viewEndpoint.equals(FBUtilities.getBroadcastAddress())) - return viewEndpoint; - - // We have to remove any endpoint which is shared between the base and the view, as it will select itself - // and throw off the counts otherwise. - if (localBaseEndpoints.contains(viewEndpoint)) - localBaseEndpoints.remove(viewEndpoint); - else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter)) - localViewEndpoints.add(viewEndpoint); - } - - // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace. - // Since the same replication strategy is used, the same placement should be used and we should get the same - // number of replicas for all of the tokens in the ring. - assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view"; - int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress()); - if (baseIdx < 0) - throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica"); - - return localViewEndpoints.get(baseIdx); - } -}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java deleted file mode 100644 index 53e4e91..0000000 --- a/src/java/org/apache/cassandra/db/view/TemporalRow.java +++ /dev/null @@ -1,414 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.db.view; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; - -import com.google.common.collect.Iterables; - -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ColumnDefinition; -import org.apache.cassandra.cql3.ColumnIdentifier; -import org.apache.cassandra.db.CBuilder; -import org.apache.cassandra.db.Clustering; -import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.Conflicts; -import org.apache.cassandra.db.DecoratedKey; -import org.apache.cassandra.db.DeletionInfo; -import org.apache.cassandra.db.DeletionTime; -import org.apache.cassandra.db.LivenessInfo; -import org.apache.cassandra.db.RangeTombstone; -import org.apache.cassandra.db.Slice; -import org.apache.cassandra.db.marshal.CompositeType; -import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; -import org.apache.cassandra.db.rows.BufferCell; -import org.apache.cassandra.db.rows.Cell; -import org.apache.cassandra.db.rows.CellPath; -import org.apache.cassandra.db.rows.Row; -import org.apache.cassandra.utils.ByteBufferUtil; -import org.apache.cassandra.utils.FBUtilities; - -/** - * Represents a single CQL Row in a base table, with both the currently persisted value and the update's value. The - * values are stored in timestamp order, but also indicate whether they are from the currently persisted, allowing a - * {@link TemporalRow.Resolver} to resolve if the value is an old value that has been updated; if it sorts after the - * update's value, then it does not qualify. - */ -public class TemporalRow -{ - private static final int NO_TTL = LivenessInfo.NO_TTL; - private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP; - private static final int NO_DELETION_TIME = DeletionTime.LIVE.localDeletionTime(); - - public interface Resolver - { - /** - * @param cells Iterable of all cells for a certain TemporalRow's Cell, in timestamp sorted order - * @return A single TemporalCell from the iterable which satisfies the resolution criteria, or null if - * there is no cell which qualifies - */ - TemporalCell resolve(Iterable<TemporalCell> cells); - } - - /** - * Returns the first value in the iterable if it is from the set of persisted cells, and the cell which results from - * reconciliation of the remaining cells does not have the same value. - */ - public static final Resolver oldValueIfUpdated = cells -> { - Iterator<TemporalCell> iterator = cells.iterator(); - if (!iterator.hasNext()) - return null; - - TemporalCell initial = iterator.next(); - if (initial.isNew || !iterator.hasNext()) - return null; - - TemporalCell value = initial; - while (iterator.hasNext()) - value = value.reconcile(iterator.next()); - - return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 ? initial : null; - }; - - public static final Resolver earliest = cells -> { - Iterator<TemporalCell> iterator = cells.iterator(); - if (!iterator.hasNext()) - return null; - return iterator.next(); - }; - - public static final Resolver latest = cells -> { - Iterator<TemporalCell> iterator = cells.iterator(); - if (!iterator.hasNext()) - return null; - - TemporalCell value = iterator.next(); - while (iterator.hasNext()) - value = value.reconcile(iterator.next()); - - return value; - }; - - private static class TemporalCell - { - public final ByteBuffer value; - public final long timestamp; - public final int ttl; - public final int localDeletionTime; - public final boolean isNew; - - private TemporalCell(ByteBuffer value, long timestamp, int ttl, int localDeletionTime, boolean isNew) - { - this.value = value; - this.timestamp = timestamp; - this.ttl = ttl; - this.localDeletionTime = localDeletionTime; - this.isNew = isNew; - } - - public TemporalCell reconcile(TemporalCell that) - { - int now = FBUtilities.nowInSeconds(); - Conflicts.Resolution resolution = Conflicts.resolveRegular(that.timestamp, - that.isLive(now), - that.localDeletionTime, - that.value, - this.timestamp, - this.isLive(now), - this.localDeletionTime, - this.value); - assert resolution != Conflicts.Resolution.MERGE; - if (resolution == Conflicts.Resolution.LEFT_WINS) - return that; - return this; - } - - private boolean isLive(int now) - { - return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && now < localDeletionTime); - } - - public Cell cell(ColumnDefinition definition, CellPath cellPath) - { - return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath); - } - } - - private final ColumnFamilyStore baseCfs; - private final java.util.Set<ColumnIdentifier> viewPrimaryKey; - private final ByteBuffer basePartitionKey; - public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns; - public final int nowInSec; - private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>(); - private int viewClusteringTtl = NO_TTL; - private long viewClusteringTimestamp = NO_TIMESTAMP; - private int viewClusteringLocalDeletionTime = NO_DELETION_TIME; - - TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew) - { - this.baseCfs = baseCfs; - this.viewPrimaryKey = viewPrimaryKey; - this.basePartitionKey = key; - this.nowInSec = nowInSec; - clusteringColumns = new HashMap<>(); - LivenessInfo liveness = row.primaryKeyLivenessInfo(); - this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME); - this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP); - this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL); - - List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns(); - for (int i = 0; i < clusteringDefs.size(); i++) - { - ColumnDefinition cdef = clusteringDefs.get(i); - clusteringColumns.put(cdef.name, row.clustering().get(i)); - - addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME, row.clustering().get(i), isNew); - } - } - - @Override - public boolean equals(Object o) - { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - - TemporalRow that = (TemporalRow) o; - - if (!clusteringColumns.equals(that.clusteringColumns)) return false; - if (!basePartitionKey.equals(that.basePartitionKey)) return false; - - return true; - } - - @Override - public int hashCode() - { - int result = basePartitionKey.hashCode(); - result = 31 * result + clusteringColumns.hashCode(); - return result; - } - - public void addColumnValue(ColumnIdentifier identifier, - CellPath cellPath, - long timestamp, - int ttl, - int localDeletionTime, - ByteBuffer value, boolean isNew) - { - if (!columnValues.containsKey(identifier)) - columnValues.put(identifier, new HashMap<>()); - - Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(identifier); - - if (!innerMap.containsKey(cellPath)) - innerMap.put(cellPath, new TreeMap<>()); - - // If this column is part of the view's primary keys - if (viewPrimaryKey.contains(identifier)) - { - this.viewClusteringTtl = minValueIfSet(this.viewClusteringTtl, ttl, NO_TTL); - this.viewClusteringTimestamp = minValueIfSet(this.viewClusteringTimestamp, timestamp, NO_TIMESTAMP); - this.viewClusteringLocalDeletionTime = minValueIfSet(this.viewClusteringLocalDeletionTime, localDeletionTime, NO_DELETION_TIME); - } - - innerMap.get(cellPath).put(timestamp, new TemporalCell(value, timestamp, ttl, localDeletionTime, isNew)); - } - - private static int minValueIfSet(int existing, int update, int defaultValue) - { - if (existing == defaultValue) - return update; - if (update == defaultValue) - return existing; - return Math.min(existing, update); - } - - private static long minValueIfSet(long existing, long update, long defaultValue) - { - if (existing == defaultValue) - return update; - if (update == defaultValue) - return existing; - return Math.min(existing, update); - } - - public int viewClusteringTtl() - { - return viewClusteringTtl; - } - - public long viewClusteringTimestamp() - { - return viewClusteringTimestamp; - } - - public int viewClusteringLocalDeletionTime() - { - return viewClusteringLocalDeletionTime; - } - - public void addCell(Cell cell, boolean isNew) - { - addColumnValue(cell.column().name, cell.path(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), isNew); - } - - // The Definition here is actually the *base table* definition - public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver resolver) - { - ColumnDefinition baseDefinition = definition.cfName.equals(baseCfs.name) - ? definition - : baseCfs.metadata.getColumnDefinition(definition.name); - - if (baseDefinition.isPartitionKey()) - { - if (baseDefinition.isOnAllComponents()) - return basePartitionKey; - else - { - CompositeType keyComparator = (CompositeType) baseCfs.metadata.getKeyValidator(); - ByteBuffer[] components = keyComparator.split(basePartitionKey); - return components[baseDefinition.position()]; - } - } - else - { - ColumnIdentifier columnIdentifier = baseDefinition.name; - - if (clusteringColumns.containsKey(columnIdentifier)) - return clusteringColumns.get(columnIdentifier); - - Collection<org.apache.cassandra.db.rows.Cell> val = values(definition, resolver); - if (val != null && val.size() == 1) - return Iterables.getOnlyElement(val).value(); - } - return null; - } - - public DeletionTime deletionTime(AbstractThreadUnsafePartition partition) - { - DeletionInfo deletionInfo = partition.deletionInfo(); - if (!deletionInfo.getPartitionDeletion().isLive()) - return deletionInfo.getPartitionDeletion(); - - Clustering baseClustering = baseClusteringBuilder().build(); - RangeTombstone clusterTombstone = deletionInfo.rangeCovering(baseClustering); - if (clusterTombstone != null) - return clusterTombstone.deletionTime(); - - Row row = partition.getRow(baseClustering); - return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion(); - } - - public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver) - { - Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(definition.name); - if (innerMap == null) - { - return Collections.emptyList(); - } - - Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>(); - for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells : innerMap.entrySet()) - { - TemporalCell cell = resolver.resolve(pathAndCells.getValue().values()); - - if (cell != null) - value.add(cell.cell(definition, pathAndCells.getKey())); - } - return value; - } - - public Slice baseSlice() - { - return baseClusteringBuilder().buildSlice(); - } - - private CBuilder baseClusteringBuilder() - { - CFMetaData metadata = baseCfs.metadata; - CBuilder builder = CBuilder.create(metadata.comparator); - - ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()]; - for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : clusteringColumns.entrySet()) - buffers[metadata.getColumnDefinition(buffer.getKey()).position()] = buffer.getValue(); - - for (ByteBuffer byteBuffer : buffers) - builder = builder.add(byteBuffer); - - return builder; - } - - static class Set implements Iterable<TemporalRow> - { - private final ColumnFamilyStore baseCfs; - private final java.util.Set<ColumnIdentifier> viewPrimaryKey; - private final ByteBuffer key; - public final DecoratedKey dk; - private final Map<Clustering, TemporalRow> clusteringToRow; - final int nowInSec = FBUtilities.nowInSeconds(); - - Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key) - { - this.baseCfs = baseCfs; - this.viewPrimaryKey = viewPrimaryKey; - this.key = key; - this.dk = baseCfs.partitioner.decorateKey(key); - this.clusteringToRow = new HashMap<>(); - } - - public Iterator<TemporalRow> iterator() - { - return clusteringToRow.values().iterator(); - } - - public TemporalRow getClustering(Clustering clustering) - { - return clusteringToRow.get(clustering); - } - - public void addRow(Row row, boolean isNew) - { - TemporalRow temporalRow = clusteringToRow.get(row.clustering()); - if (temporalRow == null) - { - temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row, nowInSec, isNew); - clusteringToRow.put(row.clustering(), temporalRow); - } - - for (Cell cell: row.cells()) - { - temporalRow.addCell(cell, isNew); - } - } - - public int size() - { - return clusteringToRow.size(); - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 9c886b0..8f52bb1 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -87,7 +87,7 @@ public class KeyspaceMetrics public final LatencyMetrics casPropose; /** CAS Commit metrics */ public final LatencyMetrics casCommit; - + public final MetricNameFactory factory; private Keyspace keyspace; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java deleted file mode 100644 index 39a5574..0000000 --- a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.metrics; - -import com.codahale.metrics.Counter; - -import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; - -public class MVWriteMetrics extends ClientRequestMetrics -{ - public final Counter viewReplicasAttempted; - public final Counter viewReplicasSuccess; - - public MVWriteMetrics(String scope) { - super(scope); - viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted")); - viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess")); - } - - public void release() - { - super.release(); - Metrics.remove(factory.createMetricName("ViewReplicasAttempted")); - Metrics.remove(factory.createMetricName("ViewReplicasSuccess")); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 4f15da2..a5f3601 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -134,8 +134,6 @@ public final class MessagingService implements MessagingServiceMBean PAXOS_PROPOSE, PAXOS_COMMIT, @Deprecated PAGED_RANGE, - BATCHLOG_MUTATION, - MATERIALIZED_VIEW_MUTATION, // remember to add new verbs at the end, since we serialize by ordinal UNUSED_1, UNUSED_2, @@ -147,8 +145,6 @@ public final class MessagingService implements MessagingServiceMBean {{ put(Verb.MUTATION, Stage.MUTATION); put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION); - put(Verb.MATERIALIZED_VIEW_MUTATION, Stage.MATERIALIZED_VIEW_MUTATION); - put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION); put(Verb.READ_REPAIR, Stage.MUTATION); put(Verb.TRUNCATE, Stage.MUTATION); put(Verb.PAXOS_PREPARE, Stage.MUTATION); @@ -207,8 +203,6 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance); put(Verb.MUTATION, Mutation.serializer); - put(Verb.BATCHLOG_MUTATION, Mutation.serializer); - put(Verb.MATERIALIZED_VIEW_MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); put(Verb.READ, ReadCommand.serializer); //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer); @@ -235,8 +229,6 @@ public final class MessagingService implements MessagingServiceMBean public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class) {{ put(Verb.MUTATION, WriteResponse.serializer); - put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer); - put(Verb.MATERIALIZED_VIEW_MUTATION, WriteResponse.serializer); put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer); @@ -299,8 +291,6 @@ public final class MessagingService implements MessagingServiceMBean */ public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE, Verb.MUTATION, - Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable?? - Verb.MATERIALIZED_VIEW_MUTATION, Verb.COUNTER_MUTATION, Verb.READ_REPAIR, Verb.READ, @@ -628,10 +618,7 @@ public final class MessagingService implements MessagingServiceMBean ConsistencyLevel consistencyLevel, boolean allowHints) { - assert message.verb == Verb.MUTATION - || message.verb == Verb.BATCHLOG_MUTATION - || message.verb == Verb.MATERIALIZED_VIEW_MUTATION - || message.verb == Verb.COUNTER_MUTATION; + assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION; int messageId = nextId(); CallbackInfo previous = callbacks.put(messageId, http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 1c21e41..7326fa9 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -307,9 +307,7 @@ public final class LegacySchemaMigrator defaultValidator); } - // The legacy schema did not have views, so we know that we are not loading a materialized view - boolean isMaterializedView = false; - CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs); + CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs); cfm.readRepairChance(tableRow.getDouble("read_repair_chance")); cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/MaterializedViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java deleted file mode 100644 index 1c55736..0000000 --- a/src/java/org/apache/cassandra/schema/MaterializedViews.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.schema; - - -import java.util.Iterator; -import java.util.Optional; - -import com.google.common.collect.ImmutableMap; - -import org.apache.cassandra.config.MaterializedViewDefinition; - -import static com.google.common.collect.Iterables.filter; - -public final class MaterializedViews implements Iterable<MaterializedViewDefinition> -{ - private final ImmutableMap<String, MaterializedViewDefinition> materializedViews; - - private MaterializedViews(Builder builder) - { - materializedViews = builder.materializedViews.build(); - } - - public static Builder builder() - { - return new Builder(); - } - - public static MaterializedViews none() - { - return builder().build(); - } - - public Iterator<MaterializedViewDefinition> iterator() - { - return materializedViews.values().iterator(); - } - - public int size() - { - return materializedViews.size(); - } - - public boolean isEmpty() - { - return materializedViews.isEmpty(); - } - - /** - * Get the materialized view with the specified name - * - * @param name a non-qualified materialized view name - * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise - */ - public Optional<MaterializedViewDefinition> get(String name) - { - return Optional.ofNullable(materializedViews.get(name)); - } - - /** - * Create a MaterializedViews instance with the provided materialized view added - */ - public MaterializedViews with(MaterializedViewDefinition materializedView) - { - if (get(materializedView.viewName).isPresent()) - throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName)); - - return builder().add(this).add(materializedView).build(); - } - - /** - * Creates a MaterializedViews instance with the materializedView with the provided name removed - */ - public MaterializedViews without(String name) - { - MaterializedViewDefinition materializedView = - get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name))); - - return builder().add(filter(this, v -> v != materializedView)).build(); - } - - /** - * Creates a MaterializedViews instance which contains an updated materialized view - */ - public MaterializedViews replace(MaterializedViewDefinition materializedView) - { - return without(materializedView.viewName).with(materializedView); - } - - @Override - public boolean equals(Object o) - { - return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews)); - } - - @Override - public int hashCode() - { - return materializedViews.hashCode(); - } - - @Override - public String toString() - { - return materializedViews.values().toString(); - } - - public static final class Builder - { - final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>(); - - private Builder() - { - } - - public MaterializedViews build() - { - return new MaterializedViews(this); - } - - public Builder add(MaterializedViewDefinition materializedView) - { - materializedViews.put(materializedView.viewName, materializedView); - return this; - } - - public Builder add(Iterable<MaterializedViewDefinition> materializedViews) - { - materializedViews.forEach(this::add); - return this; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 2150f4a..2bc7b0c 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -76,13 +76,12 @@ public final class SchemaKeyspace public static final String COLUMNS = "columns"; public static final String DROPPED_COLUMNS = "dropped_columns"; public static final String TRIGGERS = "triggers"; - public static final String MATERIALIZED_VIEWS = "materialized_views"; public static final String TYPES = "types"; public static final String FUNCTIONS = "functions"; public static final String AGGREGATES = "aggregates"; public static final List<String> ALL = - ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES); + ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES); private static final CFMetaData Keyspaces = compile(KEYSPACES, @@ -153,18 +152,6 @@ public final class SchemaKeyspace + "trigger_options map<text, text>," + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); - private static final CFMetaData MaterializedViews = - compile(MATERIALIZED_VIEWS, - "materialized views definitions", - "CREATE TABLE %s (" - + "keyspace_name text," - + "table_name text," - + "view_name text," - + "target_columns list<text>," - + "clustering_columns list<text>," - + "included_columns list<text>," - + "PRIMARY KEY ((keyspace_name), table_name, view_name))"); - private static final CFMetaData Types = compile(TYPES, "user defined type definitions", @@ -206,7 +193,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); public static final List<CFMetaData> All = - ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates); + ImmutableList.of(Keyspaces, Tables, Columns, DroppedColumns, Triggers, Types, Functions, Aggregates); private static CFMetaData compile(String name, String description, String schema) { @@ -701,8 +688,6 @@ public final class SchemaKeyspace Mutation mutation = new Mutation(NAME, getSchemaKSDecoratedKey(keyspace.name)); for (CFMetaData schemaTable : All) mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec)); - mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltMaterializedViews, mutation.key(), timestamp, nowInSec)); - mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.MaterializedViewsBuildsInProgress, mutation.key(), timestamp, nowInSec)); return mutation; } @@ -845,9 +830,6 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : table.getTriggers()) addTriggerToSchemaMutation(table, trigger, timestamp, mutation); - - for (MaterializedViewDefinition materializedView: table.getMaterializedViews()) - addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation); } } @@ -941,22 +923,6 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values()) addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation); - MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews()); - - // dropped materialized views - for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values()) - dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation); - - // newly created materialized views - for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values()) - addMaterializedViewToSchemaMutation(oldTable, materializedView, timestamp, mutation); - - // updated materialized views need to be updated - for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values()) - { - addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, diff.rightValue(), timestamp, mutation); - } - return mutation; } @@ -971,17 +937,6 @@ public final class SchemaKeyspace return Maps.difference(beforeMap, afterMap); } - private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after) - { - Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>(); - before.forEach(v -> beforeMap.put(v.viewName, v)); - - Map<String, MaterializedViewDefinition> afterMap = new HashMap<>(); - after.forEach(v -> afterMap.put(v.viewName, v)); - - return Maps.difference(beforeMap, afterMap); - } - public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). @@ -995,9 +950,6 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : table.getTriggers()) dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); - for (MaterializedViewDefinition materializedView : table.getMaterializedViews()) - dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation); - return mutation; } @@ -1062,12 +1014,8 @@ public final class SchemaKeyspace Triggers triggers = readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition); - MaterializedViews views = - readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition); - return createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns) - .triggers(triggers) - .materializedViews(views); + .triggers(triggers); } public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns) @@ -1084,9 +1032,8 @@ public final class SchemaKeyspace boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); boolean isDense = flags.contains(CFMetaData.Flag.DENSE); boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); - boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW); - CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns); + CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, columns); Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction")); Class<? extends AbstractCompactionStrategy> compactionStrategyClass = @@ -1274,103 +1221,6 @@ public final class SchemaKeyspace } /* - * Global Index metadata serialization/deserialization. - */ - - private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) - { - RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation) - .clustering(table.cfName, materializedView.viewName); - - for (ColumnIdentifier partitionColumn : materializedView.partitionColumns) - builder.addListEntry("target_columns", partitionColumn.toString()); - for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns) - builder = builder.addListEntry("clustering_columns", clusteringColumn.toString()); - for (ColumnIdentifier includedColumn : materializedView.included) - builder = builder.addListEntry("included_columns", includedColumn.toString()); - - builder.build(); - } - - private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) - { - RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName); - } - - private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) - { - RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation) - .clustering(table.cfName, materializedView.viewName); - - builder.resetCollection("target_columns"); - for (ColumnIdentifier partitionColumn : materializedView.partitionColumns) - builder.addListEntry("target_columns", partitionColumn.toString()); - - builder.resetCollection("clustering_columns"); - for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns) - builder = builder.addListEntry("clustering_columns", clusteringColumn.toString()); - - builder.resetCollection("included_columns"); - for (ColumnIdentifier includedColumn : materializedView.included) - builder = builder.addListEntry("included_columns", includedColumn.toString()); - - builder.build(); - } - - /** - * Deserialize materialized views from storage-level representation. - * - * @param partition storage-level partition containing the materialized view definitions - * @return the list of processed MaterializedViewDefinitions - */ - private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition) - { - MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder(); - String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS); - for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) - { - MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row); - views.add(mv); - } - return views.build(); - } - - private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row) - { - String name = row.getString("view_name"); - List<String> partitionColumnNames = row.getList("target_columns", UTF8Type.instance); - - String cfName = row.getString("columnfamily_name"); - List<String> clusteringColumnNames = row.getList("clustering_columns", UTF8Type.instance); - - List<ColumnIdentifier> partitionColumns = new ArrayList<>(); - for (String columnName : partitionColumnNames) - { - partitionColumns.add(ColumnIdentifier.getInterned(columnName, true)); - } - - List<ColumnIdentifier> clusteringColumns = new ArrayList<>(); - for (String columnName : clusteringColumnNames) - { - clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true)); - } - - List<String> includedColumnNames = row.getList("included_columns", UTF8Type.instance); - Set<ColumnIdentifier> includedColumns = new HashSet<>(); - if (includedColumnNames != null) - { - for (String columnName : includedColumnNames) - includedColumns.add(ColumnIdentifier.getInterned(columnName, true)); - } - - return new MaterializedViewDefinition(cfName, - name, - partitionColumns, - clusteringColumns, - includedColumns); - } - - /* * UDF metadata serialization/deserialization. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 9a57f45..6696e10 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -196,7 +196,7 @@ public abstract class AbstractReadExecutor return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); } - public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor + private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index e3ba66e..8978034 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -47,7 +47,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public final ConsistencyLevel consistencyLevel; protected final Runnable callback; protected final Collection<InetAddress> pendingEndpoints; - protected final WriteType writeType; + private final WriteType writeType; private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java deleted file mode 100644 index ac44923..0000000 --- a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java +++ /dev/null @@ -1,121 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.cassandra.service; - -import java.net.InetAddress; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import org.apache.cassandra.exceptions.WriteFailureException; -import org.apache.cassandra.exceptions.WriteTimeoutException; -import org.apache.cassandra.net.MessageIn; - -public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> -{ - AbstractWriteResponseHandler<T> wrapped; - BatchlogCleanup cleanup; - protected volatile int requiredBeforeFinish; - private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater - = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish"); - - public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup) - { - super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType); - this.wrapped = wrapped; - this.requiredBeforeFinish = requiredBeforeFinish; - this.cleanup = cleanup; - } - - protected int ackCount() - { - return wrapped.ackCount(); - } - - public void response(MessageIn<T> msg) - { - wrapped.response(msg); - if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0) - cleanup.run(); - } - - public boolean isLatencyForSnitch() - { - return wrapped.isLatencyForSnitch(); - } - - public void onFailure(InetAddress from) - { - wrapped.onFailure(from); - } - - public void assureSufficientLiveNodes() - { - wrapped.assureSufficientLiveNodes(); - } - - public void get() throws WriteTimeoutException, WriteFailureException - { - wrapped.get(); - } - - protected int totalBlockFor() - { - return wrapped.totalBlockFor(); - } - - protected int totalEndpoints() - { - return wrapped.totalEndpoints(); - } - - protected boolean waitingFor(InetAddress from) - { - return wrapped.waitingFor(from); - } - - protected void signal() - { - wrapped.signal(); - } - - public static class BatchlogCleanup - { - private final BatchlogCleanupCallback callback; - - protected volatile int mutationsWaitingFor; - private static final AtomicIntegerFieldUpdater<BatchlogCleanup> mutationsWaitingForUpdater - = AtomicIntegerFieldUpdater.newUpdater(BatchlogCleanup.class, "mutationsWaitingFor"); - - public BatchlogCleanup(int mutationsWaitingFor, BatchlogCleanupCallback callback) - { - this.mutationsWaitingFor = mutationsWaitingFor; - this.callback = callback; - } - - public void run() - { - if (mutationsWaitingForUpdater.decrementAndGet(this) == 0) - callback.invoke(); - } - } - - public interface BatchlogCleanupCallback - { - void invoke(); - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index fddf593..548cbc7 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -287,24 +287,6 @@ public class CassandraDaemon } } - Runnable indexRebuild = new Runnable() - { - @Override - public void run() - { - for (Keyspace keyspace : Keyspace.all()) - { - for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores()) - { - cf.materializedViewManager.buildAllViews(); - } - } - } - }; - - ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); - - SystemKeyspace.finishStartup(); // start server internals http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index be11c77..2c5a2ab 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,10 +31,6 @@ import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; - -import org.apache.cassandra.db.view.MaterializedViewManager; -import org.apache.cassandra.db.view.MaterializedViewUtils; -import org.apache.cassandra.metrics.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +58,7 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.TokenMetadata; +import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; import org.apache.cassandra.tracing.Tracing; @@ -94,7 +91,6 @@ public class StorageProxy implements StorageProxyMBean private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); - private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite"); private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -122,7 +118,7 @@ public class StorageProxy implements StorageProxyMBean throws OverloadedException { assert mutation instanceof Mutation; - sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION); + sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter); } }; @@ -626,80 +622,6 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); } - /** - * Use this method to have these Mutations applied - * across all replicas. - * - * @param mutations the mutations to be applied across the replicas - */ - public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations) - throws UnavailableException, OverloadedException, WriteTimeoutException - { - Tracing.trace("Determining replicas for mutation"); - final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); - - long startTime = System.nanoTime(); - List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); - - try - { - Token baseToken = StorageService.getPartitioner().getToken(dataKey); - - ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; - - //Since the base -> view replication is 1:1 we only need to store the BL locally - final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); - final UUID batchUUID = UUIDGen.getTimeUUID(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); - - // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet - for (Mutation mutation : mutations) - { - String keyspaceName = mutation.getKeyspaceName(); - Token tk = mutation.key().getToken(); - List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk)); - - WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation, - consistencyLevel, - consistencyLevel, - naturalEndpoints, - WriteType.BATCH, - cleanup); - - wrappers.add(wrapper); - - //Apply to local batchlog memtable in this thread - BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply(); - } - - // now actually perform the writes and wait for them to complete - asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION); - } - catch (WriteTimeoutException ex) - { - mvWriteMetrics.timeouts.mark(); - Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); - throw ex; - } - catch (UnavailableException e) - { - mvWriteMetrics.unavailables.mark(); - Tracing.trace("Unavailable"); - throw e; - } - catch (OverloadedException e) - { - mvWriteMetrics.unavailables.mark(); - Tracing.trace("Overloaded"); - throw e; - } - finally - { - mvWriteMetrics.addNano(System.nanoTime() - startTime); - } - } - @SuppressWarnings("unchecked") public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, @@ -708,17 +630,12 @@ public class StorageProxy implements StorageProxyMBean { Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); - boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true); - if (augmented != null) - mutateAtomically(augmented, consistencyLevel, updatesView); + mutateAtomically(augmented, consistencyLevel); + else if (mutateAtomically) + mutateAtomically((Collection<Mutation>) mutations, consistencyLevel); else - { - if (mutateAtomically || updatesView) - mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView); - else - mutate(mutations, consistencyLevel); - } + mutate(mutations, consistencyLevel); } /** @@ -729,11 +646,8 @@ public class StorageProxy implements StorageProxyMBean * * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation - * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog */ - public static void mutateAtomically(Collection<Mutation> mutations, - ConsistencyLevel consistency_level, - boolean requireQuorumForRemove) + public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level) throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for atomic batch"); @@ -744,49 +658,25 @@ public class StorageProxy implements StorageProxyMBean try { - - // If we are requiring quorum nodes for removal, we upgrade consistency level to QUORUM unless we already - // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM nodes see the update. - ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove - ? ConsistencyLevel.QUORUM - : consistency_level; - - switch (consistency_level) - { - case ALL: - case EACH_QUORUM: - batchConsistencyLevel = consistency_level; - } - - final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); - final UUID batchUUID = UUIDGen.getTimeUUID(); - BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), - new BatchlogResponseHandler.BatchlogCleanupCallback() - { - public void invoke() - { - asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID); - } - }); - // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) { - WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation, - consistency_level, - batchConsistencyLevel, - WriteType.BATCH, - cleanup); + WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH); // exit early if we can't fulfill the CL at this time. wrapper.handler.assureSufficientLiveNodes(); wrappers.add(wrapper); } // write to the batchlog + Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, consistency_level); + UUID batchUUID = UUIDGen.getTimeUUID(); syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID); // now actually perform the writes and wait for them to complete - syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); + syncWriteBatchedMutations(wrappers, localDataCenter); + + // remove the batchlog entries asynchronously + asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID); } catch (UnavailableException e) { @@ -829,13 +719,13 @@ public class StorageProxy implements StorageProxyMBean WriteType.BATCH_LOG); MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) - .createMessage(MessagingService.Verb.BATCHLOG_MUTATION); + .createMessage(); for (InetAddress target : endpoints) { int targetVersion = MessagingService.instance().getVersion(target); if (canDoLocalRequest(target)) { - insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); + insertLocal(message.payload, handler); } else if (targetVersion == MessagingService.current_version) { @@ -844,7 +734,7 @@ public class StorageProxy implements StorageProxyMBean else { MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) - .createMessage(MessagingService.Verb.BATCHLOG_MUTATION), + .createMessage(), target, handler, false); @@ -864,43 +754,25 @@ public class StorageProxy implements StorageProxyMBean WriteType.SIMPLE); Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid))); mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); - MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION); + MessageOut<Mutation> message = mutation.createMessage(); for (InetAddress target : endpoints) { if (canDoLocalRequest(target)) - insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); + insertLocal(message.payload, handler); else MessagingService.instance().sendRR(message, target, handler, false); } } - private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) - { - for (WriteResponseHandlerWrapper wrapper : wrappers) - { - Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - - try - { - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); - } - catch (OverloadedException | WriteTimeoutException e) - { - wrapper.handler.onFailure(FBUtilities.getBroadcastAddress()); - } - } - } - - private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) + private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter) throws WriteTimeoutException, OverloadedException { for (WriteResponseHandlerWrapper wrapper : wrappers) { Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); + sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter); } - for (WriteResponseHandlerWrapper wrapper : wrappers) wrapper.handler.get(); } @@ -943,52 +815,25 @@ public class StorageProxy implements StorageProxyMBean return responseHandler; } - // same as performWrites except does not initiate writes (but does perform availability checks). - private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation, - ConsistencyLevel consistency_level, - ConsistencyLevel batchConsistencyLevel, - WriteType writeType, - BatchlogResponseHandler.BatchlogCleanup cleanup) + // same as above except does not initiate writes (but does perform availability checks). + private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType) { - Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); + AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy(); String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); - BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); - return new WriteResponseHandlerWrapper(batchHandler, mutation); - } - - /** - * Same as performWrites except does not initiate writes (but does perform availability checks). - * Keeps track of MVWriteMetrics - */ - private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation, - ConsistencyLevel consistency_level, - ConsistencyLevel batchConsistencyLevel, - List<InetAddress> naturalEndpoints, - WriteType writeType, - BatchlogResponseHandler.BatchlogCleanup cleanup) - { - Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); - AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); - String keyspaceName = mutation.getKeyspaceName(); - Token tk = mutation.key().getToken(); - Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); - BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); - return new WriteResponseHandlerWrapper(batchHandler, mutation); + AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); + return new WriteResponseHandlerWrapper(responseHandler, mutation); } // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints. private static class WriteResponseHandlerWrapper { - final BatchlogResponseHandler<IMutation> handler; + final AbstractWriteResponseHandler<IMutation> handler; final Mutation mutation; - WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation) + WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation) { this.handler = handler; this.mutation = mutation; @@ -1041,8 +886,7 @@ public class StorageProxy implements StorageProxyMBean public static void sendToHintedEndpoints(final Mutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter, - Stage stage) + String localDataCenter) throws OverloadedException { // extra-datacenter replicas, grouped by dc @@ -1106,7 +950,7 @@ public class StorageProxy implements StorageProxyMBean } if (insertLocal) - insertLocal(stage, mutation, responseHandler); + insertLocal(mutation, responseHandler); if (dcGroups != null) { @@ -1215,9 +1059,10 @@ public class StorageProxy implements StorageProxyMBean } } - private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) + private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) { - StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() + + StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable() { public void runMayThrow() { @@ -1228,8 +1073,7 @@ public class StorageProxy implements StorageProxyMBean } catch (Exception ex) { - if (!(ex instanceof WriteTimeoutException)) - logger.error("Failed to apply mutation locally : {}", ex); + logger.error("Failed to apply mutation locally : {}", ex); responseHandler.onFailure(FBUtilities.getBroadcastAddress()); } } @@ -1349,7 +1193,7 @@ public class StorageProxy implements StorageProxyMBean Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress())); if (!remotes.isEmpty()) - sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); + sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter); } }; } @@ -2279,24 +2123,6 @@ public class StorageProxy implements StorageProxyMBean } /** - * This class captures metrics for materialized views writes. - */ - private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation> - { - public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup) - { - super(writeHandler, i, cleanup); - mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints()); - } - - public void response(MessageIn<IMutation> msg) - { - super.response(msg); - mvWriteMetrics.viewReplicasSuccess.inc(); - } - } - - /** * A Runnable that aborts if it doesn't start running before it times out */ private static abstract class DroppableRunnable implements Runnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/24d185d7/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5049337..da53bf7 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -297,8 +297,6 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler()); - MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MATERIALIZED_VIEW_MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler()); @@ -631,14 +629,9 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void runMayThrow() throws InterruptedException { inShutdownHook = true; - ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); - ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isShutdown() - && counterMutationStage.isShutdown() - && batchlogMutationStage.isShutdown() - && materializedViewMutationStage.isShutdown()) + if (mutationStage.isShutdown() && counterMutationStage.isShutdown()) return; // drained already if (daemon != null) @@ -649,12 +642,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // In-progress writes originating here could generate hints to be written, so shut down MessagingService // before mutation stage, so we can get all the hints saved before shutting down MessagingService.instance().shutdown(); - materializedViewMutationStage.shutdown(); - batchlogMutationStage.shutdown(); counterMutationStage.shutdown(); mutationStage.shutdown(); - materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS); StorageProxy.instance.verifyNoHintsInProgress(); @@ -3831,13 +3820,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE inShutdownHook = true; ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); - ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); - ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isTerminated() - && counterMutationStage.isTerminated() - && batchlogMutationStage.isTerminated() - && materializedViewMutationStage.isTerminated()) + if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) { logger.warn("Cannot drain node (did it already happen?)"); return; @@ -3851,12 +3835,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().shutdown(); setMode(Mode.DRAINING, "clearing mutation stage", false); - materializedViewMutationStage.shutdown(); - batchlogMutationStage.shutdown(); counterMutationStage.shutdown(); mutationStage.shutdown(); - materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); - batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
