http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java new file mode 100644 index 0000000..4dfea75 --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/MaterializedViewUtils.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.List; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.Keyspace; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.AbstractReplicationStrategy; +import org.apache.cassandra.utils.FBUtilities; + +public final class MaterializedViewUtils +{ + private MaterializedViewUtils() + { + } + + /** + * Calculate the natural endpoint for the view. + * + * The view natural endpoint is the endpint which has the same cardinality as this node in the replication factor. + * The cardinality is the number at which this node would store a piece of data, given the change in replication + * factor. + * + * For example, if we have the following ring: + * A, T1 -> B, T2 -> C, T3 -> A + * + * For the token T1, at RF=1, A would be included, so A's cardinality for T1 is 1. For the token T1, at RF=2, B would + * be included, so B's cardinality for token T1 is 2. For token T3, at RF = 2, A would be included, so A's cardinality + * for T3 is 2. + * + * For a view whose base token is T1 and whose view token is T3, the pairings between the nodes would be: + * A writes to C (A's cardinality is 1 for T1, and C's cardinality is 1 for T3) + * B writes to A (B's cardinality is 2 for T1, and A's cardinality is 2 for T3) + * C writes to B (C's cardinality is 3 for T1, and B's cardinality is 3 for T3) + * + * @throws RuntimeException if this method is called using a base token which does not belong to this replica + */ + public static InetAddress getViewNaturalEndpoint(String keyspaceName, Token baseToken, Token viewToken) + { + AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy(); + + String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + List<InetAddress> localBaseEndpoints = new ArrayList<>(); + List<InetAddress> localViewEndpoints = new ArrayList<>(); + for (InetAddress baseEndpoint : replicationStrategy.getNaturalEndpoints(baseToken)) + { + if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(baseEndpoint).equals(localDataCenter)) + localBaseEndpoints.add(baseEndpoint); + } + + for (InetAddress viewEndpoint : replicationStrategy.getNaturalEndpoints(viewToken)) + { + // If we are a base endpoint which is also a view replica, we use ourselves as our view replica + if (viewEndpoint.equals(FBUtilities.getBroadcastAddress())) + return viewEndpoint; + + // We have to remove any endpoint which is shared between the base and the view, as it will select itself + // and throw off the counts otherwise. + if (localBaseEndpoints.contains(viewEndpoint)) + localBaseEndpoints.remove(viewEndpoint); + else if (DatabaseDescriptor.getEndpointSnitch().getDatacenter(viewEndpoint).equals(localDataCenter)) + localViewEndpoints.add(viewEndpoint); + } + + // The replication strategy will be the same for the base and the view, as they must belong to the same keyspace. + // Since the same replication strategy is used, the same placement should be used and we should get the same + // number of replicas for all of the tokens in the ring. + assert localBaseEndpoints.size() == localViewEndpoints.size() : "Replication strategy should have the same number of endpoints for the base and the view"; + int baseIdx = localBaseEndpoints.indexOf(FBUtilities.getBroadcastAddress()); + if (baseIdx < 0) + throw new RuntimeException("Trying to get the view natural endpoint on a non-data replica"); + + return localViewEndpoints.get(baseIdx); + } +}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/db/view/TemporalRow.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/TemporalRow.java b/src/java/org/apache/cassandra/db/view/TemporalRow.java new file mode 100644 index 0000000..53e4e91 --- /dev/null +++ b/src/java/org/apache/cassandra/db/view/TemporalRow.java @@ -0,0 +1,414 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.db.view; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +import com.google.common.collect.Iterables; + +import org.apache.cassandra.config.CFMetaData; +import org.apache.cassandra.config.ColumnDefinition; +import org.apache.cassandra.cql3.ColumnIdentifier; +import org.apache.cassandra.db.CBuilder; +import org.apache.cassandra.db.Clustering; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.Conflicts; +import org.apache.cassandra.db.DecoratedKey; +import org.apache.cassandra.db.DeletionInfo; +import org.apache.cassandra.db.DeletionTime; +import org.apache.cassandra.db.LivenessInfo; +import org.apache.cassandra.db.RangeTombstone; +import org.apache.cassandra.db.Slice; +import org.apache.cassandra.db.marshal.CompositeType; +import org.apache.cassandra.db.partitions.AbstractThreadUnsafePartition; +import org.apache.cassandra.db.rows.BufferCell; +import org.apache.cassandra.db.rows.Cell; +import org.apache.cassandra.db.rows.CellPath; +import org.apache.cassandra.db.rows.Row; +import org.apache.cassandra.utils.ByteBufferUtil; +import org.apache.cassandra.utils.FBUtilities; + +/** + * Represents a single CQL Row in a base table, with both the currently persisted value and the update's value. The + * values are stored in timestamp order, but also indicate whether they are from the currently persisted, allowing a + * {@link TemporalRow.Resolver} to resolve if the value is an old value that has been updated; if it sorts after the + * update's value, then it does not qualify. + */ +public class TemporalRow +{ + private static final int NO_TTL = LivenessInfo.NO_TTL; + private static final long NO_TIMESTAMP = LivenessInfo.NO_TIMESTAMP; + private static final int NO_DELETION_TIME = DeletionTime.LIVE.localDeletionTime(); + + public interface Resolver + { + /** + * @param cells Iterable of all cells for a certain TemporalRow's Cell, in timestamp sorted order + * @return A single TemporalCell from the iterable which satisfies the resolution criteria, or null if + * there is no cell which qualifies + */ + TemporalCell resolve(Iterable<TemporalCell> cells); + } + + /** + * Returns the first value in the iterable if it is from the set of persisted cells, and the cell which results from + * reconciliation of the remaining cells does not have the same value. + */ + public static final Resolver oldValueIfUpdated = cells -> { + Iterator<TemporalCell> iterator = cells.iterator(); + if (!iterator.hasNext()) + return null; + + TemporalCell initial = iterator.next(); + if (initial.isNew || !iterator.hasNext()) + return null; + + TemporalCell value = initial; + while (iterator.hasNext()) + value = value.reconcile(iterator.next()); + + return ByteBufferUtil.compareUnsigned(initial.value, value.value) != 0 ? initial : null; + }; + + public static final Resolver earliest = cells -> { + Iterator<TemporalCell> iterator = cells.iterator(); + if (!iterator.hasNext()) + return null; + return iterator.next(); + }; + + public static final Resolver latest = cells -> { + Iterator<TemporalCell> iterator = cells.iterator(); + if (!iterator.hasNext()) + return null; + + TemporalCell value = iterator.next(); + while (iterator.hasNext()) + value = value.reconcile(iterator.next()); + + return value; + }; + + private static class TemporalCell + { + public final ByteBuffer value; + public final long timestamp; + public final int ttl; + public final int localDeletionTime; + public final boolean isNew; + + private TemporalCell(ByteBuffer value, long timestamp, int ttl, int localDeletionTime, boolean isNew) + { + this.value = value; + this.timestamp = timestamp; + this.ttl = ttl; + this.localDeletionTime = localDeletionTime; + this.isNew = isNew; + } + + public TemporalCell reconcile(TemporalCell that) + { + int now = FBUtilities.nowInSeconds(); + Conflicts.Resolution resolution = Conflicts.resolveRegular(that.timestamp, + that.isLive(now), + that.localDeletionTime, + that.value, + this.timestamp, + this.isLive(now), + this.localDeletionTime, + this.value); + assert resolution != Conflicts.Resolution.MERGE; + if (resolution == Conflicts.Resolution.LEFT_WINS) + return that; + return this; + } + + private boolean isLive(int now) + { + return localDeletionTime == NO_DELETION_TIME || (ttl != NO_TTL && now < localDeletionTime); + } + + public Cell cell(ColumnDefinition definition, CellPath cellPath) + { + return new BufferCell(definition, timestamp, ttl, localDeletionTime, value, cellPath); + } + } + + private final ColumnFamilyStore baseCfs; + private final java.util.Set<ColumnIdentifier> viewPrimaryKey; + private final ByteBuffer basePartitionKey; + public final Map<ColumnIdentifier, ByteBuffer> clusteringColumns; + public final int nowInSec; + private final Map<ColumnIdentifier, Map<CellPath, SortedMap<Long, TemporalCell>>> columnValues = new HashMap<>(); + private int viewClusteringTtl = NO_TTL; + private long viewClusteringTimestamp = NO_TIMESTAMP; + private int viewClusteringLocalDeletionTime = NO_DELETION_TIME; + + TemporalRow(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key, Row row, int nowInSec, boolean isNew) + { + this.baseCfs = baseCfs; + this.viewPrimaryKey = viewPrimaryKey; + this.basePartitionKey = key; + this.nowInSec = nowInSec; + clusteringColumns = new HashMap<>(); + LivenessInfo liveness = row.primaryKeyLivenessInfo(); + this.viewClusteringLocalDeletionTime = minValueIfSet(viewClusteringLocalDeletionTime, row.deletion().localDeletionTime(), NO_DELETION_TIME); + this.viewClusteringTimestamp = minValueIfSet(viewClusteringTimestamp, liveness.timestamp(), NO_TIMESTAMP); + this.viewClusteringTtl = minValueIfSet(viewClusteringTtl, liveness.ttl(), NO_TTL); + + List<ColumnDefinition> clusteringDefs = baseCfs.metadata.clusteringColumns(); + for (int i = 0; i < clusteringDefs.size(); i++) + { + ColumnDefinition cdef = clusteringDefs.get(i); + clusteringColumns.put(cdef.name, row.clustering().get(i)); + + addColumnValue(cdef.name, null, NO_TIMESTAMP, NO_TTL, NO_DELETION_TIME, row.clustering().get(i), isNew); + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TemporalRow that = (TemporalRow) o; + + if (!clusteringColumns.equals(that.clusteringColumns)) return false; + if (!basePartitionKey.equals(that.basePartitionKey)) return false; + + return true; + } + + @Override + public int hashCode() + { + int result = basePartitionKey.hashCode(); + result = 31 * result + clusteringColumns.hashCode(); + return result; + } + + public void addColumnValue(ColumnIdentifier identifier, + CellPath cellPath, + long timestamp, + int ttl, + int localDeletionTime, + ByteBuffer value, boolean isNew) + { + if (!columnValues.containsKey(identifier)) + columnValues.put(identifier, new HashMap<>()); + + Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(identifier); + + if (!innerMap.containsKey(cellPath)) + innerMap.put(cellPath, new TreeMap<>()); + + // If this column is part of the view's primary keys + if (viewPrimaryKey.contains(identifier)) + { + this.viewClusteringTtl = minValueIfSet(this.viewClusteringTtl, ttl, NO_TTL); + this.viewClusteringTimestamp = minValueIfSet(this.viewClusteringTimestamp, timestamp, NO_TIMESTAMP); + this.viewClusteringLocalDeletionTime = minValueIfSet(this.viewClusteringLocalDeletionTime, localDeletionTime, NO_DELETION_TIME); + } + + innerMap.get(cellPath).put(timestamp, new TemporalCell(value, timestamp, ttl, localDeletionTime, isNew)); + } + + private static int minValueIfSet(int existing, int update, int defaultValue) + { + if (existing == defaultValue) + return update; + if (update == defaultValue) + return existing; + return Math.min(existing, update); + } + + private static long minValueIfSet(long existing, long update, long defaultValue) + { + if (existing == defaultValue) + return update; + if (update == defaultValue) + return existing; + return Math.min(existing, update); + } + + public int viewClusteringTtl() + { + return viewClusteringTtl; + } + + public long viewClusteringTimestamp() + { + return viewClusteringTimestamp; + } + + public int viewClusteringLocalDeletionTime() + { + return viewClusteringLocalDeletionTime; + } + + public void addCell(Cell cell, boolean isNew) + { + addColumnValue(cell.column().name, cell.path(), cell.timestamp(), cell.ttl(), cell.localDeletionTime(), cell.value(), isNew); + } + + // The Definition here is actually the *base table* definition + public ByteBuffer clusteringValue(ColumnDefinition definition, Resolver resolver) + { + ColumnDefinition baseDefinition = definition.cfName.equals(baseCfs.name) + ? definition + : baseCfs.metadata.getColumnDefinition(definition.name); + + if (baseDefinition.isPartitionKey()) + { + if (baseDefinition.isOnAllComponents()) + return basePartitionKey; + else + { + CompositeType keyComparator = (CompositeType) baseCfs.metadata.getKeyValidator(); + ByteBuffer[] components = keyComparator.split(basePartitionKey); + return components[baseDefinition.position()]; + } + } + else + { + ColumnIdentifier columnIdentifier = baseDefinition.name; + + if (clusteringColumns.containsKey(columnIdentifier)) + return clusteringColumns.get(columnIdentifier); + + Collection<org.apache.cassandra.db.rows.Cell> val = values(definition, resolver); + if (val != null && val.size() == 1) + return Iterables.getOnlyElement(val).value(); + } + return null; + } + + public DeletionTime deletionTime(AbstractThreadUnsafePartition partition) + { + DeletionInfo deletionInfo = partition.deletionInfo(); + if (!deletionInfo.getPartitionDeletion().isLive()) + return deletionInfo.getPartitionDeletion(); + + Clustering baseClustering = baseClusteringBuilder().build(); + RangeTombstone clusterTombstone = deletionInfo.rangeCovering(baseClustering); + if (clusterTombstone != null) + return clusterTombstone.deletionTime(); + + Row row = partition.getRow(baseClustering); + return row == null || row.deletion().isLive() ? DeletionTime.LIVE : row.deletion(); + } + + public Collection<org.apache.cassandra.db.rows.Cell> values(ColumnDefinition definition, Resolver resolver) + { + Map<CellPath, SortedMap<Long, TemporalCell>> innerMap = columnValues.get(definition.name); + if (innerMap == null) + { + return Collections.emptyList(); + } + + Collection<org.apache.cassandra.db.rows.Cell> value = new ArrayList<>(); + for (Map.Entry<CellPath, SortedMap<Long, TemporalCell>> pathAndCells : innerMap.entrySet()) + { + TemporalCell cell = resolver.resolve(pathAndCells.getValue().values()); + + if (cell != null) + value.add(cell.cell(definition, pathAndCells.getKey())); + } + return value; + } + + public Slice baseSlice() + { + return baseClusteringBuilder().buildSlice(); + } + + private CBuilder baseClusteringBuilder() + { + CFMetaData metadata = baseCfs.metadata; + CBuilder builder = CBuilder.create(metadata.comparator); + + ByteBuffer[] buffers = new ByteBuffer[clusteringColumns.size()]; + for (Map.Entry<ColumnIdentifier, ByteBuffer> buffer : clusteringColumns.entrySet()) + buffers[metadata.getColumnDefinition(buffer.getKey()).position()] = buffer.getValue(); + + for (ByteBuffer byteBuffer : buffers) + builder = builder.add(byteBuffer); + + return builder; + } + + static class Set implements Iterable<TemporalRow> + { + private final ColumnFamilyStore baseCfs; + private final java.util.Set<ColumnIdentifier> viewPrimaryKey; + private final ByteBuffer key; + public final DecoratedKey dk; + private final Map<Clustering, TemporalRow> clusteringToRow; + final int nowInSec = FBUtilities.nowInSeconds(); + + Set(ColumnFamilyStore baseCfs, java.util.Set<ColumnIdentifier> viewPrimaryKey, ByteBuffer key) + { + this.baseCfs = baseCfs; + this.viewPrimaryKey = viewPrimaryKey; + this.key = key; + this.dk = baseCfs.partitioner.decorateKey(key); + this.clusteringToRow = new HashMap<>(); + } + + public Iterator<TemporalRow> iterator() + { + return clusteringToRow.values().iterator(); + } + + public TemporalRow getClustering(Clustering clustering) + { + return clusteringToRow.get(clustering); + } + + public void addRow(Row row, boolean isNew) + { + TemporalRow temporalRow = clusteringToRow.get(row.clustering()); + if (temporalRow == null) + { + temporalRow = new TemporalRow(baseCfs, viewPrimaryKey, key, row, nowInSec, isNew); + clusteringToRow.put(row.clustering(), temporalRow); + } + + for (Cell cell: row.cells()) + { + temporalRow.addCell(cell, isNew); + } + } + + public int size() + { + return clusteringToRow.size(); + } + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java index 8f52bb1..9c886b0 100644 --- a/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java +++ b/src/java/org/apache/cassandra/metrics/KeyspaceMetrics.java @@ -87,7 +87,7 @@ public class KeyspaceMetrics public final LatencyMetrics casPropose; /** CAS Commit metrics */ public final LatencyMetrics casCommit; - + public final MetricNameFactory factory; private Keyspace keyspace; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java new file mode 100644 index 0000000..39a5574 --- /dev/null +++ b/src/java/org/apache/cassandra/metrics/MVWriteMetrics.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.metrics; + +import com.codahale.metrics.Counter; + +import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics; + +public class MVWriteMetrics extends ClientRequestMetrics +{ + public final Counter viewReplicasAttempted; + public final Counter viewReplicasSuccess; + + public MVWriteMetrics(String scope) { + super(scope); + viewReplicasAttempted = Metrics.counter(factory.createMetricName("ViewReplicasAttempted")); + viewReplicasSuccess = Metrics.counter(factory.createMetricName("ViewReplicasSuccess")); + } + + public void release() + { + super.release(); + Metrics.remove(factory.createMetricName("ViewReplicasAttempted")); + Metrics.remove(factory.createMetricName("ViewReplicasSuccess")); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/net/MessagingService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index a5f3601..4f15da2 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -134,6 +134,8 @@ public final class MessagingService implements MessagingServiceMBean PAXOS_PROPOSE, PAXOS_COMMIT, @Deprecated PAGED_RANGE, + BATCHLOG_MUTATION, + MATERIALIZED_VIEW_MUTATION, // remember to add new verbs at the end, since we serialize by ordinal UNUSED_1, UNUSED_2, @@ -145,6 +147,8 @@ public final class MessagingService implements MessagingServiceMBean {{ put(Verb.MUTATION, Stage.MUTATION); put(Verb.COUNTER_MUTATION, Stage.COUNTER_MUTATION); + put(Verb.MATERIALIZED_VIEW_MUTATION, Stage.MATERIALIZED_VIEW_MUTATION); + put(Verb.BATCHLOG_MUTATION, Stage.BATCHLOG_MUTATION); put(Verb.READ_REPAIR, Stage.MUTATION); put(Verb.TRUNCATE, Stage.MUTATION); put(Verb.PAXOS_PREPARE, Stage.MUTATION); @@ -203,6 +207,8 @@ public final class MessagingService implements MessagingServiceMBean put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance); put(Verb.MUTATION, Mutation.serializer); + put(Verb.BATCHLOG_MUTATION, Mutation.serializer); + put(Verb.MATERIALIZED_VIEW_MUTATION, Mutation.serializer); put(Verb.READ_REPAIR, Mutation.serializer); put(Verb.READ, ReadCommand.serializer); //put(Verb.RANGE_SLICE, ReadCommand.legacyRangeSliceCommandSerializer); @@ -229,6 +235,8 @@ public final class MessagingService implements MessagingServiceMBean public static final EnumMap<Verb, IVersionedSerializer<?>> callbackDeserializers = new EnumMap<Verb, IVersionedSerializer<?>>(Verb.class) {{ put(Verb.MUTATION, WriteResponse.serializer); + put(Verb.BATCHLOG_MUTATION, WriteResponse.serializer); + put(Verb.MATERIALIZED_VIEW_MUTATION, WriteResponse.serializer); put(Verb.READ_REPAIR, WriteResponse.serializer); put(Verb.COUNTER_MUTATION, WriteResponse.serializer); put(Verb.RANGE_SLICE, ReadResponse.legacyRangeSliceReplySerializer); @@ -291,6 +299,8 @@ public final class MessagingService implements MessagingServiceMBean */ public static final EnumSet<Verb> DROPPABLE_VERBS = EnumSet.of(Verb._TRACE, Verb.MUTATION, + Verb.BATCHLOG_MUTATION, //FIXME: should this be droppable?? + Verb.MATERIALIZED_VIEW_MUTATION, Verb.COUNTER_MUTATION, Verb.READ_REPAIR, Verb.READ, @@ -618,7 +628,10 @@ public final class MessagingService implements MessagingServiceMBean ConsistencyLevel consistencyLevel, boolean allowHints) { - assert message.verb == Verb.MUTATION || message.verb == Verb.COUNTER_MUTATION; + assert message.verb == Verb.MUTATION + || message.verb == Verb.BATCHLOG_MUTATION + || message.verb == Verb.MATERIALIZED_VIEW_MUTATION + || message.verb == Verb.COUNTER_MUTATION; int messageId = nextId(); CallbackInfo previous = callbacks.put(messageId, http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java index 7326fa9..1c21e41 100644 --- a/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java +++ b/src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java @@ -307,7 +307,9 @@ public final class LegacySchemaMigrator defaultValidator); } - CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, columnDefs); + // The legacy schema did not have views, so we know that we are not loading a materialized view + boolean isMaterializedView = false; + CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs); cfm.readRepairChance(tableRow.getDouble("read_repair_chance")); cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance")); http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/MaterializedViews.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/MaterializedViews.java b/src/java/org/apache/cassandra/schema/MaterializedViews.java new file mode 100644 index 0000000..1c55736 --- /dev/null +++ b/src/java/org/apache/cassandra/schema/MaterializedViews.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.schema; + + +import java.util.Iterator; +import java.util.Optional; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.config.MaterializedViewDefinition; + +import static com.google.common.collect.Iterables.filter; + +public final class MaterializedViews implements Iterable<MaterializedViewDefinition> +{ + private final ImmutableMap<String, MaterializedViewDefinition> materializedViews; + + private MaterializedViews(Builder builder) + { + materializedViews = builder.materializedViews.build(); + } + + public static Builder builder() + { + return new Builder(); + } + + public static MaterializedViews none() + { + return builder().build(); + } + + public Iterator<MaterializedViewDefinition> iterator() + { + return materializedViews.values().iterator(); + } + + public int size() + { + return materializedViews.size(); + } + + public boolean isEmpty() + { + return materializedViews.isEmpty(); + } + + /** + * Get the materialized view with the specified name + * + * @param name a non-qualified materialized view name + * @return an empty {@link Optional} if the materialized view name is not found; a non-empty optional of {@link MaterializedViewDefinition} otherwise + */ + public Optional<MaterializedViewDefinition> get(String name) + { + return Optional.ofNullable(materializedViews.get(name)); + } + + /** + * Create a MaterializedViews instance with the provided materialized view added + */ + public MaterializedViews with(MaterializedViewDefinition materializedView) + { + if (get(materializedView.viewName).isPresent()) + throw new IllegalStateException(String.format("Materialized View %s already exists", materializedView.viewName)); + + return builder().add(this).add(materializedView).build(); + } + + /** + * Creates a MaterializedViews instance with the materializedView with the provided name removed + */ + public MaterializedViews without(String name) + { + MaterializedViewDefinition materializedView = + get(name).orElseThrow(() -> new IllegalStateException(String.format("Materialized View %s doesn't exists", name))); + + return builder().add(filter(this, v -> v != materializedView)).build(); + } + + /** + * Creates a MaterializedViews instance which contains an updated materialized view + */ + public MaterializedViews replace(MaterializedViewDefinition materializedView) + { + return without(materializedView.viewName).with(materializedView); + } + + @Override + public boolean equals(Object o) + { + return this == o || (o instanceof MaterializedViews && materializedViews.equals(((MaterializedViews) o).materializedViews)); + } + + @Override + public int hashCode() + { + return materializedViews.hashCode(); + } + + @Override + public String toString() + { + return materializedViews.values().toString(); + } + + public static final class Builder + { + final ImmutableMap.Builder<String, MaterializedViewDefinition> materializedViews = new ImmutableMap.Builder<>(); + + private Builder() + { + } + + public MaterializedViews build() + { + return new MaterializedViews(this); + } + + public Builder add(MaterializedViewDefinition materializedView) + { + materializedViews.put(materializedView.viewName, materializedView); + return this; + } + + public Builder add(Iterable<MaterializedViewDefinition> materializedViews) + { + materializedViews.forEach(this::add); + return this; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/schema/SchemaKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java index 2bc7b0c..2150f4a 100644 --- a/src/java/org/apache/cassandra/schema/SchemaKeyspace.java +++ b/src/java/org/apache/cassandra/schema/SchemaKeyspace.java @@ -76,12 +76,13 @@ public final class SchemaKeyspace public static final String COLUMNS = "columns"; public static final String DROPPED_COLUMNS = "dropped_columns"; public static final String TRIGGERS = "triggers"; + public static final String MATERIALIZED_VIEWS = "materialized_views"; public static final String TYPES = "types"; public static final String FUNCTIONS = "functions"; public static final String AGGREGATES = "aggregates"; public static final List<String> ALL = - ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, TYPES, FUNCTIONS, AGGREGATES); + ImmutableList.of(KEYSPACES, TABLES, COLUMNS, TRIGGERS, MATERIALIZED_VIEWS, TYPES, FUNCTIONS, AGGREGATES); private static final CFMetaData Keyspaces = compile(KEYSPACES, @@ -152,6 +153,18 @@ public final class SchemaKeyspace + "trigger_options map<text, text>," + "PRIMARY KEY ((keyspace_name), table_name, trigger_name))"); + private static final CFMetaData MaterializedViews = + compile(MATERIALIZED_VIEWS, + "materialized views definitions", + "CREATE TABLE %s (" + + "keyspace_name text," + + "table_name text," + + "view_name text," + + "target_columns list<text>," + + "clustering_columns list<text>," + + "included_columns list<text>," + + "PRIMARY KEY ((keyspace_name), table_name, view_name))"); + private static final CFMetaData Types = compile(TYPES, "user defined type definitions", @@ -193,7 +206,7 @@ public final class SchemaKeyspace + "PRIMARY KEY ((keyspace_name), aggregate_name, signature))"); public static final List<CFMetaData> All = - ImmutableList.of(Keyspaces, Tables, Columns, DroppedColumns, Triggers, Types, Functions, Aggregates); + ImmutableList.of(Keyspaces, Tables, Columns, Triggers, DroppedColumns, MaterializedViews, Types, Functions, Aggregates); private static CFMetaData compile(String name, String description, String schema) { @@ -688,6 +701,8 @@ public final class SchemaKeyspace Mutation mutation = new Mutation(NAME, getSchemaKSDecoratedKey(keyspace.name)); for (CFMetaData schemaTable : All) mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec)); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.BuiltMaterializedViews, mutation.key(), timestamp, nowInSec)); + mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.MaterializedViewsBuildsInProgress, mutation.key(), timestamp, nowInSec)); return mutation; } @@ -830,6 +845,9 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : table.getTriggers()) addTriggerToSchemaMutation(table, trigger, timestamp, mutation); + + for (MaterializedViewDefinition materializedView: table.getMaterializedViews()) + addMaterializedViewToSchemaMutation(table, materializedView, timestamp, mutation); } } @@ -923,6 +941,22 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : triggerDiff.entriesOnlyOnRight().values()) addTriggerToSchemaMutation(newTable, trigger, timestamp, mutation); + MapDifference<String, MaterializedViewDefinition> materializedViewDiff = materializedViewsDiff(oldTable.getMaterializedViews(), newTable.getMaterializedViews()); + + // dropped materialized views + for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnLeft().values()) + dropMaterializedViewFromSchemaMutation(oldTable, materializedView, timestamp, mutation); + + // newly created materialized views + for (MaterializedViewDefinition materializedView : materializedViewDiff.entriesOnlyOnRight().values()) + addMaterializedViewToSchemaMutation(oldTable, materializedView, timestamp, mutation); + + // updated materialized views need to be updated + for (MapDifference.ValueDifference<MaterializedViewDefinition> diff : materializedViewDiff.entriesDiffering().values()) + { + addUpdatedMaterializedViewDefinitionToSchemaMutation(oldTable, diff.rightValue(), timestamp, mutation); + } + return mutation; } @@ -937,6 +971,17 @@ public final class SchemaKeyspace return Maps.difference(beforeMap, afterMap); } + private static MapDifference<String, MaterializedViewDefinition> materializedViewsDiff(MaterializedViews before, MaterializedViews after) + { + Map<String, MaterializedViewDefinition> beforeMap = new HashMap<>(); + before.forEach(v -> beforeMap.put(v.viewName, v)); + + Map<String, MaterializedViewDefinition> afterMap = new HashMap<>(); + after.forEach(v -> afterMap.put(v.viewName, v)); + + return Maps.difference(beforeMap, afterMap); + } + public static Mutation makeDropTableMutation(KeyspaceMetadata keyspace, CFMetaData table, long timestamp) { // Include the serialized keyspace in case the target node missed a CREATE KEYSPACE migration (see CASSANDRA-5631). @@ -950,6 +995,9 @@ public final class SchemaKeyspace for (TriggerMetadata trigger : table.getTriggers()) dropTriggerFromSchemaMutation(table, trigger, timestamp, mutation); + for (MaterializedViewDefinition materializedView : table.getMaterializedViews()) + dropMaterializedViewFromSchemaMutation(table, materializedView, timestamp, mutation); + return mutation; } @@ -1014,8 +1062,12 @@ public final class SchemaKeyspace Triggers triggers = readSchemaPartitionForTableAndApply(TRIGGERS, keyspace, table, SchemaKeyspace::createTriggersFromTriggersPartition); + MaterializedViews views = + readSchemaPartitionForTableAndApply(MATERIALIZED_VIEWS, keyspace, table, SchemaKeyspace::createMaterializedViewsFromMaterializedViewsPartition); + return createTableFromTableRowAndColumns(row, columns).droppedColumns(droppedColumns) - .triggers(triggers); + .triggers(triggers) + .materializedViews(views); } public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row row, List<ColumnDefinition> columns) @@ -1032,8 +1084,9 @@ public final class SchemaKeyspace boolean isCounter = flags.contains(CFMetaData.Flag.COUNTER); boolean isDense = flags.contains(CFMetaData.Flag.DENSE); boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND); + boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW); - CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, columns); + CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns); Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction")); Class<? extends AbstractCompactionStrategy> compactionStrategyClass = @@ -1221,6 +1274,103 @@ public final class SchemaKeyspace } /* + * Global Index metadata serialization/deserialization. + */ + + private static void addMaterializedViewToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation) + .clustering(table.cfName, materializedView.viewName); + + for (ColumnIdentifier partitionColumn : materializedView.partitionColumns) + builder.addListEntry("target_columns", partitionColumn.toString()); + for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns) + builder = builder.addListEntry("clustering_columns", clusteringColumn.toString()); + for (ColumnIdentifier includedColumn : materializedView.included) + builder = builder.addListEntry("included_columns", includedColumn.toString()); + + builder.build(); + } + + private static void dropMaterializedViewFromSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) + { + RowUpdateBuilder.deleteRow(MaterializedViews, timestamp, mutation, table.cfName, materializedView.viewName); + } + + private static void addUpdatedMaterializedViewDefinitionToSchemaMutation(CFMetaData table, MaterializedViewDefinition materializedView, long timestamp, Mutation mutation) + { + RowUpdateBuilder builder = new RowUpdateBuilder(MaterializedViews, timestamp, mutation) + .clustering(table.cfName, materializedView.viewName); + + builder.resetCollection("target_columns"); + for (ColumnIdentifier partitionColumn : materializedView.partitionColumns) + builder.addListEntry("target_columns", partitionColumn.toString()); + + builder.resetCollection("clustering_columns"); + for (ColumnIdentifier clusteringColumn : materializedView.clusteringColumns) + builder = builder.addListEntry("clustering_columns", clusteringColumn.toString()); + + builder.resetCollection("included_columns"); + for (ColumnIdentifier includedColumn : materializedView.included) + builder = builder.addListEntry("included_columns", includedColumn.toString()); + + builder.build(); + } + + /** + * Deserialize materialized views from storage-level representation. + * + * @param partition storage-level partition containing the materialized view definitions + * @return the list of processed MaterializedViewDefinitions + */ + private static MaterializedViews createMaterializedViewsFromMaterializedViewsPartition(RowIterator partition) + { + MaterializedViews.Builder views = org.apache.cassandra.schema.MaterializedViews.builder(); + String query = String.format("SELECT * FROM %s.%s", NAME, MATERIALIZED_VIEWS); + for (UntypedResultSet.Row row : QueryProcessor.resultify(query, partition)) + { + MaterializedViewDefinition mv = createMaterializedViewFromMaterializedViewRow(row); + views.add(mv); + } + return views.build(); + } + + private static MaterializedViewDefinition createMaterializedViewFromMaterializedViewRow(UntypedResultSet.Row row) + { + String name = row.getString("view_name"); + List<String> partitionColumnNames = row.getList("target_columns", UTF8Type.instance); + + String cfName = row.getString("columnfamily_name"); + List<String> clusteringColumnNames = row.getList("clustering_columns", UTF8Type.instance); + + List<ColumnIdentifier> partitionColumns = new ArrayList<>(); + for (String columnName : partitionColumnNames) + { + partitionColumns.add(ColumnIdentifier.getInterned(columnName, true)); + } + + List<ColumnIdentifier> clusteringColumns = new ArrayList<>(); + for (String columnName : clusteringColumnNames) + { + clusteringColumns.add(ColumnIdentifier.getInterned(columnName, true)); + } + + List<String> includedColumnNames = row.getList("included_columns", UTF8Type.instance); + Set<ColumnIdentifier> includedColumns = new HashSet<>(); + if (includedColumnNames != null) + { + for (String columnName : includedColumnNames) + includedColumns.add(ColumnIdentifier.getInterned(columnName, true)); + } + + return new MaterializedViewDefinition(cfName, + name, + partitionColumns, + clusteringColumns, + includedColumns); + } + + /* * UDF metadata serialization/deserialization. */ http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractReadExecutor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java index 6696e10..9a57f45 100644 --- a/src/java/org/apache/cassandra/service/AbstractReadExecutor.java +++ b/src/java/org/apache/cassandra/service/AbstractReadExecutor.java @@ -196,7 +196,7 @@ public abstract class AbstractReadExecutor return new SpeculatingReadExecutor(keyspace, cfs, command, consistencyLevel, targetReplicas); } - private static class NeverSpeculatingReadExecutor extends AbstractReadExecutor + public static class NeverSpeculatingReadExecutor extends AbstractReadExecutor { public NeverSpeculatingReadExecutor(Keyspace keyspace, ReadCommand command, ConsistencyLevel consistencyLevel, List<InetAddress> targetReplicas) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index 8978034..e3ba66e 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -47,7 +47,7 @@ public abstract class AbstractWriteResponseHandler<T> implements IAsyncCallbackW public final ConsistencyLevel consistencyLevel; protected final Runnable callback; protected final Collection<InetAddress> pendingEndpoints; - private final WriteType writeType; + protected final WriteType writeType; private static final AtomicIntegerFieldUpdater<AbstractWriteResponseHandler> failuresUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractWriteResponseHandler.class, "failures"); private volatile int failures = 0; http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java new file mode 100644 index 0000000..ac44923 --- /dev/null +++ b/src/java/org/apache/cassandra/service/BatchlogResponseHandler.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service; + +import java.net.InetAddress; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import org.apache.cassandra.exceptions.WriteFailureException; +import org.apache.cassandra.exceptions.WriteTimeoutException; +import org.apache.cassandra.net.MessageIn; + +public class BatchlogResponseHandler<T> extends AbstractWriteResponseHandler<T> +{ + AbstractWriteResponseHandler<T> wrapped; + BatchlogCleanup cleanup; + protected volatile int requiredBeforeFinish; + private static final AtomicIntegerFieldUpdater<BatchlogResponseHandler> requiredBeforeFinishUpdater + = AtomicIntegerFieldUpdater.newUpdater(BatchlogResponseHandler.class, "requiredBeforeFinish"); + + public BatchlogResponseHandler(AbstractWriteResponseHandler<T> wrapped, int requiredBeforeFinish, BatchlogCleanup cleanup) + { + super(wrapped.keyspace, wrapped.naturalEndpoints, wrapped.pendingEndpoints, wrapped.consistencyLevel, wrapped.callback, wrapped.writeType); + this.wrapped = wrapped; + this.requiredBeforeFinish = requiredBeforeFinish; + this.cleanup = cleanup; + } + + protected int ackCount() + { + return wrapped.ackCount(); + } + + public void response(MessageIn<T> msg) + { + wrapped.response(msg); + if (requiredBeforeFinishUpdater.decrementAndGet(this) == 0) + cleanup.run(); + } + + public boolean isLatencyForSnitch() + { + return wrapped.isLatencyForSnitch(); + } + + public void onFailure(InetAddress from) + { + wrapped.onFailure(from); + } + + public void assureSufficientLiveNodes() + { + wrapped.assureSufficientLiveNodes(); + } + + public void get() throws WriteTimeoutException, WriteFailureException + { + wrapped.get(); + } + + protected int totalBlockFor() + { + return wrapped.totalBlockFor(); + } + + protected int totalEndpoints() + { + return wrapped.totalEndpoints(); + } + + protected boolean waitingFor(InetAddress from) + { + return wrapped.waitingFor(from); + } + + protected void signal() + { + wrapped.signal(); + } + + public static class BatchlogCleanup + { + private final BatchlogCleanupCallback callback; + + protected volatile int mutationsWaitingFor; + private static final AtomicIntegerFieldUpdater<BatchlogCleanup> mutationsWaitingForUpdater + = AtomicIntegerFieldUpdater.newUpdater(BatchlogCleanup.class, "mutationsWaitingFor"); + + public BatchlogCleanup(int mutationsWaitingFor, BatchlogCleanupCallback callback) + { + this.mutationsWaitingFor = mutationsWaitingFor; + this.callback = callback; + } + + public void run() + { + if (mutationsWaitingForUpdater.decrementAndGet(this) == 0) + callback.invoke(); + } + } + + public interface BatchlogCleanupCallback + { + void invoke(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/CassandraDaemon.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 548cbc7..fddf593 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -287,6 +287,24 @@ public class CassandraDaemon } } + Runnable indexRebuild = new Runnable() + { + @Override + public void run() + { + for (Keyspace keyspace : Keyspace.all()) + { + for (ColumnFamilyStore cf: keyspace.getColumnFamilyStores()) + { + cf.materializedViewManager.buildAllViews(); + } + } + } + }; + + ScheduledExecutors.optionalTasks.schedule(indexRebuild, StorageService.RING_DELAY, TimeUnit.MILLISECONDS); + + SystemKeyspace.finishStartup(); // start server internals http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageProxy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java index 2c5a2ab..be11c77 100644 --- a/src/java/org/apache/cassandra/service/StorageProxy.java +++ b/src/java/org/apache/cassandra/service/StorageProxy.java @@ -31,6 +31,10 @@ import com.google.common.base.Predicate; import com.google.common.cache.CacheLoader; import com.google.common.collect.*; import com.google.common.util.concurrent.Uninterruptibles; + +import org.apache.cassandra.db.view.MaterializedViewManager; +import org.apache.cassandra.db.view.MaterializedViewUtils; +import org.apache.cassandra.metrics.*; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +62,6 @@ import org.apache.cassandra.locator.AbstractReplicationStrategy; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.metrics.*; import org.apache.cassandra.net.*; import org.apache.cassandra.service.paxos.*; import org.apache.cassandra.tracing.Tracing; @@ -91,6 +94,7 @@ public class StorageProxy implements StorageProxyMBean private static final ClientRequestMetrics writeMetrics = new ClientRequestMetrics("Write"); private static final CASClientRequestMetrics casWriteMetrics = new CASClientRequestMetrics("CASWrite"); private static final CASClientRequestMetrics casReadMetrics = new CASClientRequestMetrics("CASRead"); + private static final MVWriteMetrics mvWriteMetrics = new MVWriteMetrics("MVWrite"); private static final double CONCURRENT_SUBREQUESTS_MARGIN = 0.10; @@ -118,7 +122,7 @@ public class StorageProxy implements StorageProxyMBean throws OverloadedException { assert mutation instanceof Mutation; - sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter); + sendToHintedEndpoints((Mutation) mutation, targets, responseHandler, localDataCenter, Stage.MUTATION); } }; @@ -622,6 +626,80 @@ public class StorageProxy implements StorageProxyMBean Tracing.trace("Wrote hint to satisfy CL.ANY after no replicas acknowledged the write"); } + /** + * Use this method to have these Mutations applied + * across all replicas. + * + * @param mutations the mutations to be applied across the replicas + */ + public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations) + throws UnavailableException, OverloadedException, WriteTimeoutException + { + Tracing.trace("Determining replicas for mutation"); + final String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(FBUtilities.getBroadcastAddress()); + + long startTime = System.nanoTime(); + List<WriteResponseHandlerWrapper> wrappers = new ArrayList<>(mutations.size()); + + try + { + Token baseToken = StorageService.getPartitioner().getToken(dataKey); + + ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; + + //Since the base -> view replication is 1:1 we only need to store the BL locally + final Collection<InetAddress> batchlogEndpoints = Collections.singleton(FBUtilities.getBroadcastAddress()); + final UUID batchUUID = UUIDGen.getTimeUUID(); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), + () -> asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID)); + + // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet + for (Mutation mutation : mutations) + { + String keyspaceName = mutation.getKeyspaceName(); + Token tk = mutation.key().getToken(); + List<InetAddress> naturalEndpoints = Lists.newArrayList(MaterializedViewUtils.getViewNaturalEndpoint(keyspaceName, baseToken, tk)); + + WriteResponseHandlerWrapper wrapper = wrapMVBatchResponseHandler(mutation, + consistencyLevel, + consistencyLevel, + naturalEndpoints, + WriteType.BATCH, + cleanup); + + wrappers.add(wrapper); + + //Apply to local batchlog memtable in this thread + BatchlogManager.getBatchlogMutationFor(mutations, batchUUID, MessagingService.current_version).apply(); + } + + // now actually perform the writes and wait for them to complete + asyncWriteBatchedMutations(wrappers, localDataCenter, Stage.MATERIALIZED_VIEW_MUTATION); + } + catch (WriteTimeoutException ex) + { + mvWriteMetrics.timeouts.mark(); + Tracing.trace("Write timeout; received {} of {} required replies", ex.received, ex.blockFor); + throw ex; + } + catch (UnavailableException e) + { + mvWriteMetrics.unavailables.mark(); + Tracing.trace("Unavailable"); + throw e; + } + catch (OverloadedException e) + { + mvWriteMetrics.unavailables.mark(); + Tracing.trace("Overloaded"); + throw e; + } + finally + { + mvWriteMetrics.addNano(System.nanoTime() - startTime); + } + } + @SuppressWarnings("unchecked") public static void mutateWithTriggers(Collection<? extends IMutation> mutations, ConsistencyLevel consistencyLevel, @@ -630,12 +708,17 @@ public class StorageProxy implements StorageProxyMBean { Collection<Mutation> augmented = TriggerExecutor.instance.execute(mutations); + boolean updatesView = MaterializedViewManager.updatesAffectView(mutations, true); + if (augmented != null) - mutateAtomically(augmented, consistencyLevel); - else if (mutateAtomically) - mutateAtomically((Collection<Mutation>) mutations, consistencyLevel); + mutateAtomically(augmented, consistencyLevel, updatesView); else - mutate(mutations, consistencyLevel); + { + if (mutateAtomically || updatesView) + mutateAtomically((Collection<Mutation>) mutations, consistencyLevel, updatesView); + else + mutate(mutations, consistencyLevel); + } } /** @@ -646,8 +729,11 @@ public class StorageProxy implements StorageProxyMBean * * @param mutations the Mutations to be applied across the replicas * @param consistency_level the consistency level for the operation + * @param requireQuorumForRemove at least a quorum of nodes will see update before deleting batchlog */ - public static void mutateAtomically(Collection<Mutation> mutations, ConsistencyLevel consistency_level) + public static void mutateAtomically(Collection<Mutation> mutations, + ConsistencyLevel consistency_level, + boolean requireQuorumForRemove) throws UnavailableException, OverloadedException, WriteTimeoutException { Tracing.trace("Determining replicas for atomic batch"); @@ -658,25 +744,49 @@ public class StorageProxy implements StorageProxyMBean try { + + // If we are requiring quorum nodes for removal, we upgrade consistency level to QUORUM unless we already + // require ALL, or EACH_QUORUM. This is so that *at least* QUORUM nodes see the update. + ConsistencyLevel batchConsistencyLevel = requireQuorumForRemove + ? ConsistencyLevel.QUORUM + : consistency_level; + + switch (consistency_level) + { + case ALL: + case EACH_QUORUM: + batchConsistencyLevel = consistency_level; + } + + final Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, batchConsistencyLevel); + final UUID batchUUID = UUIDGen.getTimeUUID(); + BatchlogResponseHandler.BatchlogCleanup cleanup = new BatchlogResponseHandler.BatchlogCleanup(mutations.size(), + new BatchlogResponseHandler.BatchlogCleanupCallback() + { + public void invoke() + { + asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID); + } + }); + // add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet for (Mutation mutation : mutations) { - WriteResponseHandlerWrapper wrapper = wrapResponseHandler(mutation, consistency_level, WriteType.BATCH); + WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation, + consistency_level, + batchConsistencyLevel, + WriteType.BATCH, + cleanup); // exit early if we can't fulfill the CL at this time. wrapper.handler.assureSufficientLiveNodes(); wrappers.add(wrapper); } // write to the batchlog - Collection<InetAddress> batchlogEndpoints = getBatchlogEndpoints(localDataCenter, consistency_level); - UUID batchUUID = UUIDGen.getTimeUUID(); syncWriteToBatchlog(mutations, batchlogEndpoints, batchUUID); // now actually perform the writes and wait for them to complete - syncWriteBatchedMutations(wrappers, localDataCenter); - - // remove the batchlog entries asynchronously - asyncRemoveFromBatchlog(batchlogEndpoints, batchUUID); + syncWriteBatchedMutations(wrappers, localDataCenter, Stage.MUTATION); } catch (UnavailableException e) { @@ -719,13 +829,13 @@ public class StorageProxy implements StorageProxyMBean WriteType.BATCH_LOG); MessageOut<Mutation> message = BatchlogManager.getBatchlogMutationFor(mutations, uuid, MessagingService.current_version) - .createMessage(); + .createMessage(MessagingService.Verb.BATCHLOG_MUTATION); for (InetAddress target : endpoints) { int targetVersion = MessagingService.instance().getVersion(target); if (canDoLocalRequest(target)) { - insertLocal(message.payload, handler); + insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); } else if (targetVersion == MessagingService.current_version) { @@ -734,7 +844,7 @@ public class StorageProxy implements StorageProxyMBean else { MessagingService.instance().sendRR(BatchlogManager.getBatchlogMutationFor(mutations, uuid, targetVersion) - .createMessage(), + .createMessage(MessagingService.Verb.BATCHLOG_MUTATION), target, handler, false); @@ -754,25 +864,43 @@ public class StorageProxy implements StorageProxyMBean WriteType.SIMPLE); Mutation mutation = new Mutation(SystemKeyspace.NAME, StorageService.getPartitioner().decorateKey(UUIDType.instance.decompose(uuid))); mutation.add(PartitionUpdate.fullPartitionDelete(SystemKeyspace.Batchlog, mutation.key(), FBUtilities.timestampMicros(), FBUtilities.nowInSeconds())); - MessageOut<Mutation> message = mutation.createMessage(); + MessageOut<Mutation> message = mutation.createMessage(MessagingService.Verb.BATCHLOG_MUTATION); for (InetAddress target : endpoints) { if (canDoLocalRequest(target)) - insertLocal(message.payload, handler); + insertLocal(Stage.BATCHLOG_MUTATION, message.payload, handler); else MessagingService.instance().sendRR(message, target, handler, false); } } - private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter) + private static void asyncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) + { + for (WriteResponseHandlerWrapper wrapper : wrappers) + { + Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); + + try + { + sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); + } + catch (OverloadedException | WriteTimeoutException e) + { + wrapper.handler.onFailure(FBUtilities.getBroadcastAddress()); + } + } + } + + private static void syncWriteBatchedMutations(List<WriteResponseHandlerWrapper> wrappers, String localDataCenter, Stage stage) throws WriteTimeoutException, OverloadedException { for (WriteResponseHandlerWrapper wrapper : wrappers) { Iterable<InetAddress> endpoints = Iterables.concat(wrapper.handler.naturalEndpoints, wrapper.handler.pendingEndpoints); - sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter); + sendToHintedEndpoints(wrapper.mutation, endpoints, wrapper.handler, localDataCenter, stage); } + for (WriteResponseHandlerWrapper wrapper : wrappers) wrapper.handler.get(); } @@ -815,25 +943,52 @@ public class StorageProxy implements StorageProxyMBean return responseHandler; } - // same as above except does not initiate writes (but does perform availability checks). - private static WriteResponseHandlerWrapper wrapResponseHandler(Mutation mutation, ConsistencyLevel consistency_level, WriteType writeType) + // same as performWrites except does not initiate writes (but does perform availability checks). + private static WriteResponseHandlerWrapper wrapBatchResponseHandler(Mutation mutation, + ConsistencyLevel consistency_level, + ConsistencyLevel batchConsistencyLevel, + WriteType writeType, + BatchlogResponseHandler.BatchlogCleanup cleanup) { - AbstractReplicationStrategy rs = Keyspace.open(mutation.getKeyspaceName()).getReplicationStrategy(); + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); String keyspaceName = mutation.getKeyspaceName(); Token tk = mutation.key().getToken(); List<InetAddress> naturalEndpoints = StorageService.instance.getNaturalEndpoints(keyspaceName, tk); Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); - AbstractWriteResponseHandler<IMutation> responseHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); - return new WriteResponseHandlerWrapper(responseHandler, mutation); + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); + BatchlogResponseHandler<IMutation> batchHandler = new BatchlogResponseHandler<>(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); + return new WriteResponseHandlerWrapper(batchHandler, mutation); + } + + /** + * Same as performWrites except does not initiate writes (but does perform availability checks). + * Keeps track of MVWriteMetrics + */ + private static WriteResponseHandlerWrapper wrapMVBatchResponseHandler(Mutation mutation, + ConsistencyLevel consistency_level, + ConsistencyLevel batchConsistencyLevel, + List<InetAddress> naturalEndpoints, + WriteType writeType, + BatchlogResponseHandler.BatchlogCleanup cleanup) + { + Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName()); + AbstractReplicationStrategy rs = keyspace.getReplicationStrategy(); + String keyspaceName = mutation.getKeyspaceName(); + Token tk = mutation.key().getToken(); + Collection<InetAddress> pendingEndpoints = StorageService.instance.getTokenMetadata().pendingEndpointsFor(tk, keyspaceName); + AbstractWriteResponseHandler<IMutation> writeHandler = rs.getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, null, writeType); + BatchlogResponseHandler<IMutation> batchHandler = new MVWriteMetricsWrapped(writeHandler, batchConsistencyLevel.blockFor(keyspace), cleanup); + return new WriteResponseHandlerWrapper(batchHandler, mutation); } // used by atomic_batch_mutate to decouple availability check from the write itself, caches consistency level and endpoints. private static class WriteResponseHandlerWrapper { - final AbstractWriteResponseHandler<IMutation> handler; + final BatchlogResponseHandler<IMutation> handler; final Mutation mutation; - WriteResponseHandlerWrapper(AbstractWriteResponseHandler<IMutation> handler, Mutation mutation) + WriteResponseHandlerWrapper(BatchlogResponseHandler<IMutation> handler, Mutation mutation) { this.handler = handler; this.mutation = mutation; @@ -886,7 +1041,8 @@ public class StorageProxy implements StorageProxyMBean public static void sendToHintedEndpoints(final Mutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler<IMutation> responseHandler, - String localDataCenter) + String localDataCenter, + Stage stage) throws OverloadedException { // extra-datacenter replicas, grouped by dc @@ -950,7 +1106,7 @@ public class StorageProxy implements StorageProxyMBean } if (insertLocal) - insertLocal(mutation, responseHandler); + insertLocal(stage, mutation, responseHandler); if (dcGroups != null) { @@ -1059,10 +1215,9 @@ public class StorageProxy implements StorageProxyMBean } } - private static void insertLocal(final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) + private static void insertLocal(Stage stage, final Mutation mutation, final AbstractWriteResponseHandler<IMutation> responseHandler) { - - StageManager.getStage(Stage.MUTATION).maybeExecuteImmediately(new LocalMutationRunnable() + StageManager.getStage(stage).maybeExecuteImmediately(new LocalMutationRunnable() { public void runMayThrow() { @@ -1073,7 +1228,8 @@ public class StorageProxy implements StorageProxyMBean } catch (Exception ex) { - logger.error("Failed to apply mutation locally : {}", ex); + if (!(ex instanceof WriteTimeoutException)) + logger.error("Failed to apply mutation locally : {}", ex); responseHandler.onFailure(FBUtilities.getBroadcastAddress()); } } @@ -1193,7 +1349,7 @@ public class StorageProxy implements StorageProxyMBean Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(FBUtilities.getBroadcastAddress())); if (!remotes.isEmpty()) - sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter); + sendToHintedEndpoints(result, remotes, responseHandler, localDataCenter, Stage.COUNTER_MUTATION); } }; } @@ -2123,6 +2279,24 @@ public class StorageProxy implements StorageProxyMBean } /** + * This class captures metrics for materialized views writes. + */ + private static class MVWriteMetricsWrapped extends BatchlogResponseHandler<IMutation> + { + public MVWriteMetricsWrapped(AbstractWriteResponseHandler<IMutation> writeHandler, int i, BatchlogCleanup cleanup) + { + super(writeHandler, i, cleanup); + mvWriteMetrics.viewReplicasAttempted.inc(totalEndpoints()); + } + + public void response(MessageIn<IMutation> msg) + { + super.response(msg); + mvWriteMetrics.viewReplicasSuccess.inc(); + } + } + + /** * A Runnable that aborts if it doesn't start running before it times out */ private static abstract class DroppableRunnable implements Runnable http://git-wip-us.apache.org/repos/asf/cassandra/blob/3bdcaa33/src/java/org/apache/cassandra/service/StorageService.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index da53bf7..5049337 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -297,6 +297,8 @@ public class StorageService extends NotificationBroadcasterSupport implements IE /* register the verb handlers */ MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MUTATION, new MutationVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.BATCHLOG_MUTATION, new MutationVerbHandler()); + MessagingService.instance().registerVerbHandlers(MessagingService.Verb.MATERIALIZED_VIEW_MUTATION, new MutationVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ_REPAIR, new ReadRepairVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.READ, new ReadCommandVerbHandler()); MessagingService.instance().registerVerbHandlers(MessagingService.Verb.RANGE_SLICE, new ReadCommandVerbHandler()); @@ -629,9 +631,14 @@ public class StorageService extends NotificationBroadcasterSupport implements IE public void runMayThrow() throws InterruptedException { inShutdownHook = true; + ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); + ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isShutdown() && counterMutationStage.isShutdown()) + if (mutationStage.isShutdown() + && counterMutationStage.isShutdown() + && batchlogMutationStage.isShutdown() + && materializedViewMutationStage.isShutdown()) return; // drained already if (daemon != null) @@ -642,8 +649,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE // In-progress writes originating here could generate hints to be written, so shut down MessagingService // before mutation stage, so we can get all the hints saved before shutting down MessagingService.instance().shutdown(); + materializedViewMutationStage.shutdown(); + batchlogMutationStage.shutdown(); counterMutationStage.shutdown(); mutationStage.shutdown(); + materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS); StorageProxy.instance.verifyNoHintsInProgress(); @@ -3820,8 +3831,13 @@ public class StorageService extends NotificationBroadcasterSupport implements IE inShutdownHook = true; ExecutorService counterMutationStage = StageManager.getStage(Stage.COUNTER_MUTATION); + ExecutorService batchlogMutationStage = StageManager.getStage(Stage.BATCHLOG_MUTATION); + ExecutorService materializedViewMutationStage = StageManager.getStage(Stage.MATERIALIZED_VIEW_MUTATION); ExecutorService mutationStage = StageManager.getStage(Stage.MUTATION); - if (mutationStage.isTerminated() && counterMutationStage.isTerminated()) + if (mutationStage.isTerminated() + && counterMutationStage.isTerminated() + && batchlogMutationStage.isTerminated() + && materializedViewMutationStage.isTerminated()) { logger.warn("Cannot drain node (did it already happen?)"); return; @@ -3835,8 +3851,12 @@ public class StorageService extends NotificationBroadcasterSupport implements IE MessagingService.instance().shutdown(); setMode(Mode.DRAINING, "clearing mutation stage", false); + materializedViewMutationStage.shutdown(); + batchlogMutationStage.shutdown(); counterMutationStage.shutdown(); mutationStage.shutdown(); + materializedViewMutationStage.awaitTermination(3600, TimeUnit.SECONDS); + batchlogMutationStage.awaitTermination(3600, TimeUnit.SECONDS); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); mutationStage.awaitTermination(3600, TimeUnit.SECONDS);
