This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit dca3895684d45173e468e471fccdba9c4ae5056b Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 2 19:07:12 2023 +0000 [CEP-21] Consistent read/write path (3/7) Part 3 of 7 adds the ability to detect version mismatches between peers on the read/write path and to handle such divergence. Lagging peers will attempt to catch up from the CMS if the coordinator in a r/w operation has seen newer metadata. Coordinators may fail writes if the cluster metadata changes while the write is in flight, if the consistency level can no longer be satisfied by the original replica plan. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- src/java/org/apache/cassandra/db/ReadCommand.java | 9 + .../cassandra/db/ReadCommandVerbHandler.java | 80 +++-- src/java/org/apache/cassandra/db/ReadResponse.java | 7 + .../cassandra/db/partitions/PartitionUpdate.java | 18 ++ .../org/apache/cassandra/locator/ReplicaPlan.java | 271 ++++++++++++++-- .../org/apache/cassandra/locator/ReplicaPlans.java | 350 ++++++++++++++++----- .../apache/cassandra/net/ResponseVerbHandler.java | 14 + .../service/AbstractWriteResponseHandler.java | 4 + .../cassandra/service/WriteResponseHandler.java | 12 + .../org/apache/cassandra/service/paxos/Paxos.java | 34 +- .../cassandra/service/reads/ReadCallback.java | 35 +-- .../service/reads/ReplicaFilteringProtection.java | 3 +- .../cassandra/service/reads/ResponseResolver.java | 1 - .../service/reads/range/ReplicaPlanIterator.java | 7 +- .../reads/repair/BlockingPartitionRepair.java | 54 ++-- .../service/reads/repair/BlockingReadRepair.java | 9 +- .../service/reads/repair/NoopReadRepair.java | 2 +- .../service/reads/repair/ReadOnlyReadRepair.java | 2 +- .../cassandra/service/reads/repair/ReadRepair.java | 2 +- .../reads/repair/RowIteratorMergeListener.java | 20 +- 20 files changed, 744 insertions(+), 190 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ReadCommand.java b/src/java/org/apache/cassandra/db/ReadCommand.java index 0978b8ef05..de218d9b67 100644 --- a/src/java/org/apache/cassandra/db/ReadCommand.java +++ b/src/java/org/apache/cassandra/db/ReadCommand.java @@ -38,6 +38,7 @@ import org.apache.cassandra.config.*; import org.apache.cassandra.db.filter.*; import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.net.MessageFlag; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ParamType; import org.apache.cassandra.net.Verb; import org.apache.cassandra.db.partitions.*; @@ -66,6 +67,7 @@ import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.SchemaProvider; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.ClientWarn; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tracing.Tracing; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.ObjectSizes; @@ -1070,6 +1072,8 @@ public abstract class ReadCommand extends AbstractReadQuery if (command.isDigestQuery()) out.writeUnsignedVInt32(command.digestVersion()); command.metadata().id.serialize(out); + if (version >= MessagingService.VERSION_50) + Epoch.serializer.serialize(command.metadata().epoch, out); out.writeInt(command.nowInSec()); ColumnFilter.serializer.serialize(command.columnFilter(), out, version); RowFilter.serializer.serialize(command.rowFilter(), out, version); @@ -1097,6 +1101,10 @@ public abstract class ReadCommand extends AbstractReadQuery boolean hasIndex = hasIndex(flags); int digestVersion = isDigest ? in.readUnsignedVInt32() : 0; TableMetadata metadata = schema.getExistingTableMetadata(TableId.deserialize(in)); + Epoch schemaVersion = null; + if (version >= MessagingService.VERSION_50) + schemaVersion = Epoch.serializer.deserialize(in); + assert schemaVersion == null || metadata.epoch.equals(schemaVersion) : metadata.epoch + " " + schemaVersion; // TODO: handle etc int nowInSec = in.readInt(); ColumnFilter columnFilter = ColumnFilter.serializer.deserialize(in, version, metadata); RowFilter rowFilter = RowFilter.serializer.deserialize(in, version, metadata); @@ -1128,6 +1136,7 @@ public abstract class ReadCommand extends AbstractReadQuery return 2 // kind + flags + (command.isDigestQuery() ? TypeSizes.sizeofUnsignedVInt(command.digestVersion()) : 0) + command.metadata().id.serializedSize() + + (version >= MessagingService.VERSION_50 ? Epoch.serializer.serializedSize(command.metadata().epoch) : 0) + TypeSizes.sizeof(command.nowInSec()) + ColumnFilter.serializer.serializedSize(command.columnFilter(), version) + RowFilter.serializer.serializedSize(command.rowFilter(), version) diff --git a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java index f693bbc7c3..0aa295422f 100644 --- a/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java +++ b/src/java/org/apache/cassandra/db/ReadCommandVerbHandler.java @@ -22,15 +22,19 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; +import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Token; import org.apache.cassandra.exceptions.InvalidRequestException; import org.apache.cassandra.exceptions.QueryCancelledException; import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.net.IVerbHandler; import org.apache.cassandra.net.Message; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tracing.Tracing; +import org.apache.cassandra.utils.FBUtilities; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -42,13 +46,15 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> public void doVerb(Message<ReadCommand> message) { + ClusterMetadataService.instance().maybeCatchup(message.epoch()); + if (StorageService.instance.isBootstrapMode()) - { throw new RuntimeException("Cannot service reads while bootstrapping!"); - } ReadCommand command = message.payload; - validateTransientStatus(message); + ClusterMetadata metadata = ClusterMetadata.current(); + + checkTokenOwnership(metadata, message); MessageParams.reset(); long timeout = message.expiresAtNanos() - message.createdAtNanos(); @@ -103,37 +109,61 @@ public class ReadCommandVerbHandler implements IVerbHandler<ReadCommand> } } - private void validateTransientStatus(Message<ReadCommand> message) + private void checkTokenOwnership(ClusterMetadata metadata, Message<ReadCommand> message) { ReadCommand command = message.payload; if (command.metadata().isVirtual()) return; - Token token; if (command instanceof SinglePartitionReadCommand) - token = ((SinglePartitionReadCommand) command).partitionKey().getToken(); - else - token = ((PartitionRangeReadCommand) command).dataRange().keyRange().right.getToken(); - - Replica replica = Keyspace.open(command.metadata().keyspace) - .getReplicationStrategy() - .getLocalReplicaFor(token); - - if (replica == null) { - logger.warn("Received a read request from {} for a range that is not owned by the current replica {}.", - message.from(), - command); - return; + Token token = ((SinglePartitionReadCommand) command).partitionKey().getToken(); + Replica localReplica = getLocalReplica(metadata, token, command.metadata().keyspace); + if (localReplica == null) + { + throw new InvalidRequestException(String.format("Received a read request from %s for a token %s that is not owned by the current replica as of %s: %s.", + message.from(), token, metadata.epoch, message.payload)); + } + + if (!command.acceptsTransient() && localReplica.isTransient()) + { + MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS); + throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s", + command.acceptsTransient() ? "transient" : "full", + localReplica.isTransient() ? "transient" : "full", + this)); + } } - - if (!command.acceptsTransient() && replica.isTransient()) + else { - MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS); - throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s", - command.acceptsTransient() ? "transient" : "full", - replica.isTransient() ? "transient" : "full", - this)); + AbstractBounds<PartitionPosition> range = ((PartitionRangeReadCommand) command).dataRange().keyRange(); + + // TODO: preexisting issue: for the range queries or queries that span multiple replicas, we can only make requests where the right token is owned, but not the left one + Replica maxTokenLocalReplica = getLocalReplica(metadata, range.right.getToken(), command.metadata().keyspace); + if (maxTokenLocalReplica == null) + { + throw new InvalidRequestException(String.format("Received a read request from %s for a range [%s,%s] that is not owned by the current replica as of %s: %s.", + message.from(), range.left, range.right, metadata.epoch, message.payload)); + } + + // TODO: preexisting issue: we should change the whole range for transient-ness, not just the right token + if (command.acceptsTransient() != maxTokenLocalReplica.isTransient()) + { + MessagingService.instance().metrics.recordDroppedMessage(message, message.elapsedSinceCreated(NANOSECONDS), NANOSECONDS); + throw new InvalidRequestException(String.format("Attempted to serve %s data request from %s node in %s", + command.acceptsTransient() ? "transient" : "full", + maxTokenLocalReplica.isTransient() ? "transient" : "full", + this)); + } } } + + private static Replica getLocalReplica(ClusterMetadata metadata, Token token, String keyspace) + { + return metadata.placements + .get(metadata.schema.getKeyspaces().getNullable(keyspace).params.replication) + .reads + .forToken(token) + .lookup(FBUtilities.getBroadcastAddressAndPort()); + } } diff --git a/src/java/org/apache/cassandra/db/ReadResponse.java b/src/java/org/apache/cassandra/db/ReadResponse.java index 9ef9128a36..8e5e36b176 100644 --- a/src/java/org/apache/cassandra/db/ReadResponse.java +++ b/src/java/org/apache/cassandra/db/ReadResponse.java @@ -34,6 +34,8 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.utils.ByteBufferUtil; +import static org.apache.cassandra.db.RepairedDataInfo.NO_OP_REPAIRED_DATA_INFO; + public abstract class ReadResponse { // Serializer for single partition read response @@ -48,6 +50,11 @@ public abstract class ReadResponse return new LocalDataResponse(data, command, rdi); } + public static ReadResponse createDataResponse(UnfilteredPartitionIterator data, ReadCommand command) + { + return new LocalDataResponse(data, command, NO_OP_REPAIRED_DATA_INFO); + } + public static ReadResponse createSimpleDataResponse(UnfilteredPartitionIterator data, ColumnFilter selection) { return new LocalDataResponse(data, selection); diff --git a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java index 7f2ca7d465..0180818d19 100644 --- a/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java +++ b/src/java/org/apache/cassandra/db/partitions/PartitionUpdate.java @@ -35,11 +35,14 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.rows.*; +import org.apache.cassandra.exceptions.IncompatibleSchemaException; import org.apache.cassandra.index.IndexRegistry; import org.apache.cassandra.io.util.DataInputBuffer; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputBuffer; import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.schema.ColumnMetadata; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.TableId; @@ -49,6 +52,7 @@ import org.apache.cassandra.utils.btree.UpdateFunction; import org.apache.cassandra.utils.vint.VIntCoding; import static org.apache.cassandra.db.rows.UnfilteredRowIteratorSerializer.IS_EMPTY; +import static org.apache.cassandra.tcm.Epoch.FIRST; /** * Stores updates made on a partition. @@ -716,6 +720,8 @@ public class PartitionUpdate extends AbstractBTreePartition assert !iter.isReverseOrder(); update.metadata.id.serialize(out); + if (version >= MessagingService.VERSION_50) + Epoch.serializer.serialize(update.metadata.epoch != null ? update.metadata.epoch : Epoch.EMPTY, out); UnfilteredRowIteratorSerializer.serializer.serialize(iter, null, out, version, update.rowCount()); } } @@ -723,6 +729,17 @@ public class PartitionUpdate extends AbstractBTreePartition public PartitionUpdate deserialize(DataInputPlus in, int version, DeserializationHelper.Flag flag) throws IOException { TableMetadata metadata = Schema.instance.getExistingTableMetadata(TableId.deserialize(in)); + Epoch remoteVersion = Epoch.EMPTY; + if (version >= MessagingService.VERSION_50) + remoteVersion = Epoch.serializer.deserialize(in); + if (remoteVersion != null && remoteVersion.isBefore(FIRST) && !remoteVersion.equals(metadata.epoch)) + { + // This exception should never be thrown under normal condition. By the time partition update is serialized, + // replica should be able to fully catch up with coordinator. + throw new IncompatibleSchemaException(String.format("Incompatible table schema version. Our version: %s. Coordinator version: %s.", + metadata.epoch, remoteVersion)); + } + UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(metadata, null, in, version, flag); if (header.isEmpty) return emptyUpdate(metadata, header.key); @@ -774,6 +791,7 @@ public class PartitionUpdate extends AbstractBTreePartition try (UnfilteredRowIterator iter = update.unfilteredIterator()) { return update.metadata.id.serializedSize() + + (version >= MessagingService.VERSION_50 ? Epoch.serializer.serializedSize(update.metadata.epoch) : 0) + UnfilteredRowIteratorSerializer.serializer.serializedSize(iter, null, version, update.rowCount()); } } diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlan.java b/src/java/org/apache/cassandra/locator/ReplicaPlan.java index 3bb3ec0222..b5ac5ceec3 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlan.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlan.java @@ -23,7 +23,15 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.AbstractBounds; - +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.utils.FBUtilities; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Function; import java.util.function.Predicate; import java.util.function.Supplier; @@ -38,6 +46,10 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> Replica lookup(InetAddressAndPort endpoint); P withContacts(E contacts); + void collectSuccess(InetAddressAndPort inetAddressAndPort); + void collectFailure(InetAddressAndPort inetAddressAndPort, RequestFailureReason t); + boolean stillAppliesTo(ClusterMetadata newMetadata); + interface ForRead<E extends Endpoints<E>, P extends ReplicaPlan.ForRead<E, P>> extends ReplicaPlan<E, P> { int readQuorum(); @@ -57,6 +69,7 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> // It could be different than the one fetched from Keyspace later, e.g. RS altered during the query. // Use the snapshot to calculate {@code blockFor} in order to have a consistent view of RS for the query. protected final AbstractReplicationStrategy replicationStrategy; + protected final Epoch epoch; // all nodes we will contact via any mechanism, including hints // i.e., for: @@ -66,22 +79,43 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> // ==> liveAndDown.natural().filter(isFull) ++ liveAndDown.pending() ++ live.natural.filter(isTransient, req) // - paxos, includes all live replicas (natural+pending), for this DC if SERIAL_LOCAL // ==> live.all() (if consistencyLevel.isDCLocal(), then .filter(consistencyLevel.isLocal)) - private final E contacts; + protected final E contacts; + + protected final Function<ClusterMetadata, P> recompute; + protected List<InetAddressAndPort> contacted = new CopyOnWriteArrayList<>(); - AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E contacts) + AbstractReplicaPlan(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E contacts, Function<ClusterMetadata, P> recompute, Epoch epoch) { assert contacts != null; this.keyspace = keyspace; this.replicationStrategy = replicationStrategy; this.consistencyLevel = consistencyLevel; this.contacts = contacts; + this.epoch = epoch; + this.recompute = recompute; } public E contacts() { return contacts; } - public Keyspace keyspace() { return keyspace; } public AbstractReplicationStrategy replicationStrategy() { return replicationStrategy; } public ConsistencyLevel consistencyLevel() { return consistencyLevel; } + public boolean canDoLocalRequest() + { + return contacts.contains(FBUtilities.getBroadcastAddressAndPort()); + } + + public Epoch epoch() + { + return epoch; + } + + public void collectSuccess(InetAddressAndPort addr) + { + contacted.add(addr); + } + + public void collectFailure(InetAddressAndPort inetAddressAndPort, RequestFailureReason t) {} + } public static abstract class AbstractForRead<E extends Endpoints<E>, P extends ForRead<E, P>> extends AbstractReplicaPlan<E, P> implements ForRead<E, P> @@ -89,14 +123,22 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> // all nodes we *could* contacts; typically all natural replicas that are believed to be alive // we will consult this collection to find uncontacted nodes we might contact if we doubt we will meet consistency level final E candidates; - - AbstractForRead(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, E candidates, E contacts) + final int readQuorum; + + AbstractForRead(Keyspace keyspace, + AbstractReplicationStrategy replicationStrategy, + ConsistencyLevel consistencyLevel, + E candidates, + E contacts, + Function<ClusterMetadata, P> recompute, + Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, contacts); + super(keyspace, replicationStrategy, consistencyLevel, contacts, recompute, epoch); this.candidates = candidates; + this.readQuorum = consistencyLevel.blockFor(replicationStrategy); } - public int readQuorum() { return consistencyLevel.blockFor(replicationStrategy); } + public int readQuorum() { return readQuorum; } public E readCandidates() { return candidates; } @@ -114,27 +156,77 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> { return "ReplicaPlan.ForRead [ CL: " + consistencyLevel + " keyspace: " + keyspace + " candidates: " + candidates + " contacts: " + contacts() + " ]"; } + + @Override + public boolean stillAppliesTo(ClusterMetadata newMetadata) + { + if (newMetadata.epoch.equals(epoch)) + return true; + + // If we can't decide, return. + if (recompute == null) + return true; + + ForRead<?, ?> newPlan = recompute.apply(newMetadata); + + if (readCandidates().equals(newPlan.readCandidates())) + return true; + + int readQuorum = newPlan.readQuorum(); + for (InetAddressAndPort addr : contacted) + { + if (newPlan.readCandidates().contains(addr)) + readQuorum--; + } + + if (readQuorum <= 0) + return true; + + throw new IllegalStateException(String.format("During operation execution, the ring has changed in a way that would make responses violate the consistency level." + + "\n\tReceived responses from: %s" + + "\n\tOld candidates: %s" + + "\n\tNew candidates: %s" + + "\n\tRemaining required: %d", + contacted, candidates, newPlan.readCandidates(), readQuorum)); + } } public static class ForTokenRead extends AbstractForRead<EndpointsForToken, ForTokenRead> { + private final Supplier<ReplicaPlan.ForReadRepair> repairPlan; + public ForTokenRead(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, EndpointsForToken candidates, - EndpointsForToken contacts) + EndpointsForToken contacts, + Function<ClusterMetadata, ReplicaPlan.ForTokenRead> recompute, + Supplier<ReplicaPlan.ForReadRepair> repairPlan, + Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, candidates, contacts); + super(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, recompute, epoch); + this.repairPlan = repairPlan; } - public ForTokenRead withContacts(EndpointsForToken newContact) + public ForTokenRead withContacts(EndpointsForToken newContacts) { - return new ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, newContact); + ForTokenRead res = new ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, newContacts, recompute, repairPlan, epoch); + res.contacted.addAll(contacted); + return res; + } + + public ForReadRepair repairPlan() + { + if (repairPlan != null) + return repairPlan.get(); + + throw new IllegalStateException("Can not construct a repair plan on a derivative plan."); } } public static class ForRangeRead extends AbstractForRead<EndpointsForRange, ForRangeRead> { + private final Function<Token, ReplicaPlan.ForReadRepair> repairPlan; final AbstractBounds<PartitionPosition> range; final int vnodeCount; @@ -144,11 +236,15 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> AbstractBounds<PartitionPosition> range, EndpointsForRange candidates, EndpointsForRange contact, - int vnodeCount) + int vnodeCount, + Function<ClusterMetadata, ReplicaPlan.ForRangeRead> recompute, + Function<Token, ReplicaPlan.ForReadRepair> repairPlan, + Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, candidates, contact); + super(keyspace, replicationStrategy, consistencyLevel, candidates, contact, recompute, epoch); this.range = range; this.vnodeCount = vnodeCount; + this.repairPlan = repairPlan; } public AbstractBounds<PartitionPosition> range() { return range; } @@ -160,7 +256,19 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> public ForRangeRead withContacts(EndpointsForRange newContact) { - return new ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, readCandidates(), newContact, vnodeCount); + ForRangeRead res = new ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, readCandidates(), newContact, vnodeCount, recompute, repairPlan, epoch); + res.contacted.addAll(contacted); + return res; + } + + public ForReadRepair repairPlan(Token token) + { + if (repairPlan != null) + { + return repairPlan.apply(token); + } + + throw new IllegalStateException("Can not construct a repair plan on a derivative plan."); } } @@ -170,16 +278,25 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> final EndpointsForToken pending; final EndpointsForToken liveAndDown; final EndpointsForToken live; - - public ForWrite(Keyspace keyspace, AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact) + final int writeQuorum; + public ForWrite(Keyspace keyspace, + AbstractReplicationStrategy replicationStrategy, + ConsistencyLevel consistencyLevel, + EndpointsForToken pending, + EndpointsForToken liveAndDown, + EndpointsForToken live, + EndpointsForToken contact, + Function<ClusterMetadata, ForWrite> recompute, + Epoch epoch) { - super(keyspace, replicationStrategy, consistencyLevel, contact); + super(keyspace, replicationStrategy, consistencyLevel, contact, recompute, epoch); this.pending = pending; this.liveAndDown = liveAndDown; this.live = live; + this.writeQuorum = consistencyLevel.blockForWrite(replicationStrategy, pending); } - public int writeQuorum() { return consistencyLevel.blockForWrite(replicationStrategy, pending()); } + public int writeQuorum() { return writeQuorum; } /** Replicas that a region of the ring is moving to; not yet ready to serve reads, but should receive writes */ public EndpointsForToken pending() { return pending; } @@ -203,25 +320,135 @@ public interface ReplicaPlan<E extends Endpoints<E>, P extends ReplicaPlan<E, P> private ForWrite copy(ConsistencyLevel newConsistencyLevel, EndpointsForToken newContact) { - return new ForWrite(keyspace, replicationStrategy, newConsistencyLevel, pending(), liveAndDown(), live(), newContact); + ForWrite res = new ForWrite(keyspace, replicationStrategy, newConsistencyLevel, pending(), liveAndDown(), live(), newContact, recompute, epoch); + res.contacted.addAll(contacted); + return res; } ForWrite withConsistencyLevel(ConsistencyLevel newConsistencylevel) { return copy(newConsistencylevel, contacts()); } public ForWrite withContacts(EndpointsForToken newContact) { return copy(consistencyLevel, newContact); } + // TODO: this method can return a collection of received responses that apply, and an explanation on why + // contacts are not enough to satisfy the replicaplan. + public boolean stillAppliesTo(ClusterMetadata newMetadata) + { + if (newMetadata.epoch.equals(epoch)) + return true; + + // If we can't decide, return. + if (recompute == null) + return true; + + ForWrite newPlan = recompute.apply(newMetadata); + + // We do not concern ourselves with down nodes here, at least not if we could make a successful write on them + if (liveAndDown.equals(newPlan.liveAndDown) && pending.equals(newPlan.pending)) + return true; + + int writeQuorum = newPlan.writeQuorum(); + + for (InetAddressAndPort addr : contacted) + { + if (newPlan.liveAndDown().contains(addr)) + writeQuorum--; + } + + if (writeQuorum <= 0) + return true; + + throw new IllegalStateException(String.format("During operation execution, the ring has changed in a way that would make responses violate the consistency level." + + "\n\tReceived responses from: %s" + + "\n\tOld candidates: %s%s" + + "\n\tNew candidates: %s%s" + + "\n\tRemaining required: %d", + contacted, + liveAndDown, pending.isEmpty() ? "" : String.format(" (%s pending)", pending), + newPlan.liveAndDown, newPlan.pending.isEmpty() ? "" : String.format(" (%s pending)", newPlan.pending), + writeQuorum)); + } + public String toString() { return "ReplicaPlan.ForWrite [ CL: " + consistencyLevel + " keyspace: " + keyspace + " liveAndDown: " + liveAndDown + " live: " + live + " contacts: " + contacts() + " ]"; } } + + public static class ForReadRepair extends ForWrite + { + private final Predicate<Replica> skipBlockingFor; + + public ForReadRepair(Keyspace keyspace, + // TODO: replication strategy is now a part of ks + AbstractReplicationStrategy replicationStrategy, + ConsistencyLevel consistencyLevel, + EndpointsForToken pending, + EndpointsForToken liveAndDown, + EndpointsForToken live, + EndpointsForToken contact, + Function<ClusterMetadata, ForWrite> recompute, + Epoch epoch) + { + this(keyspace, replicationStrategy, consistencyLevel, pending, liveAndDown, live, contact, recompute, (r) -> false, epoch); + } + + private ForReadRepair(Keyspace keyspace, + AbstractReplicationStrategy replicationStrategy, + ConsistencyLevel consistencyLevel, + EndpointsForToken pending, + EndpointsForToken liveAndDown, + EndpointsForToken live, + EndpointsForToken contact, + Function<ClusterMetadata, ForWrite> recompute, + Predicate<Replica> skipBlockingFor, + Epoch epoch) + { + super(keyspace, replicationStrategy, consistencyLevel, pending, liveAndDown, live, contact, recompute, epoch); + this.skipBlockingFor = skipBlockingFor; + } + + @Override + public int writeQuorum() + { + int writeQuorum = super.writeQuorum(); + for (Replica contact : contacts()) + { + if (skipBlockingFor.test(contact)) + writeQuorum--; + } + return writeQuorum; + } + + public ForReadRepair skipBlockingFor(Predicate<Replica> newVal) + { + return new ForReadRepair(keyspace, + replicationStrategy, + consistencyLevel, + pending, + liveAndDown, + live, + contacts, + recompute, + newVal, + epoch); + } + } + public static class ForPaxosWrite extends ForWrite { final int requiredParticipants; - ForPaxosWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken pending, EndpointsForToken liveAndDown, EndpointsForToken live, EndpointsForToken contact, int requiredParticipants) + ForPaxosWrite(Keyspace keyspace, + ConsistencyLevel consistencyLevel, + EndpointsForToken pending, + EndpointsForToken liveAndDown, + EndpointsForToken live, + EndpointsForToken contact, + int requiredParticipants, + Function<ClusterMetadata, ForWrite> recompute, + Epoch epoch) { - super(keyspace, keyspace.getReplicationStrategy(), consistencyLevel, pending, liveAndDown, live, contact); + super(keyspace, keyspace.getReplicationStrategy(), consistencyLevel, pending, liveAndDown, live, contact, recompute, epoch); this.requiredParticipants = requiredParticipants; } diff --git a/src/java/org/apache/cassandra/locator/ReplicaPlans.java b/src/java/org/apache/cassandra/locator/ReplicaPlans.java index c39862adff..6d3ffd589f 100644 --- a/src/java/org/apache/cassandra/locator/ReplicaPlans.java +++ b/src/java/org/apache/cassandra/locator/ReplicaPlans.java @@ -39,6 +39,7 @@ import org.apache.cassandra.exceptions.UnavailableException; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.service.reads.AlwaysSpeculativeRetryPolicy; import org.apache.cassandra.service.reads.SpeculativeRetryPolicy; @@ -46,15 +47,9 @@ import org.apache.cassandra.utils.FBUtilities; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ThreadLocalRandom; -import java.util.function.Consumer; -import java.util.function.Function; -import java.util.function.Predicate; +import java.util.function.*; import static com.google.common.collect.Iterables.any; import static com.google.common.collect.Iterables.filter; @@ -107,10 +102,12 @@ public class ReplicaPlans { assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, liveReplicas, consistencyLevel.blockFor(replicationStrategy), 1); } + static void assureSufficientLiveReplicasForWrite(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, Endpoints<?> pendingWithDown) throws UnavailableException { assureSufficientLiveReplicas(replicationStrategy, consistencyLevel, allLive, consistencyLevel.blockForWrite(replicationStrategy, pendingWithDown), 0); } + static void assureSufficientLiveReplicas(AbstractReplicationStrategy replicationStrategy, ConsistencyLevel consistencyLevel, Endpoints<?> allLive, int blockFor, int blockForFullReplicas) throws UnavailableException { switch (consistencyLevel) @@ -175,34 +172,85 @@ public class ReplicaPlans /** * Construct a ReplicaPlan for writing to exactly one node, with CL.ONE. This node is *assumed* to be alive. */ - public static ReplicaPlan.ForWrite forSingleReplicaWrite(Keyspace keyspace, Token token, Replica replica) + public static ReplicaPlan.ForWrite forSingleReplicaWrite(ClusterMetadata metadata, Keyspace keyspace, Token token, Function<ClusterMetadata, Replica> replicaSupplier) { - EndpointsForToken one = EndpointsForToken.of(token, replica); + EndpointsForToken one = EndpointsForToken.of(token, replicaSupplier.apply(metadata)); EndpointsForToken empty = EndpointsForToken.empty(token); - return new ReplicaPlan.ForWrite(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, empty, one, one, one); + + return new ReplicaPlan.ForWrite(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, empty, one, one, one, + (newClusterMetadata) -> forSingleReplicaWrite(newClusterMetadata, keyspace, token, replicaSupplier), + metadata.epoch); + } + + /** + * Find a suitable replica as leader for counter update. + * For now, we pick a random replica in the local DC (or ask the snitch if + * there is no replica alive in the local DC). + * + * TODO: if we track the latency of the counter writes (which makes sense + * contrarily to standard writes since there is a read involved), we could + * trust the dynamic snitch entirely, which may be a better solution. It + * is unclear we want to mix those latencies with read latencies, so this + * may be a bit involved. + */ + public static Replica findCounterLeaderReplica(ClusterMetadata metadata, String keyspaceName, DecoratedKey key, String localDataCenter, ConsistencyLevel cl) throws UnavailableException + { + Keyspace keyspace = Keyspace.open(keyspaceName); + IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); + AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); + + EndpointsForToken replicas = metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(key.getToken()); + + // CASSANDRA-13043: filter out those endpoints not accepting clients yet, maybe because still bootstrapping + // TODO: replace this with JOINED state. + // TODO don't forget adding replicas = replicas.filter(replica -> FailureDetector.instance.isAlive(replica.endpoint())); after rebase (from CASSANDRA-17411) + replicas = replicas.filter(replica -> StorageService.instance.isRpcReady(replica.endpoint())); + + // TODO have a way to compute the consistency level + if (replicas.isEmpty()) + throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0); + + List<Replica> localReplicas = new ArrayList<>(replicas.size()); + + for (Replica replica : replicas) + if (snitch.getDatacenter(replica).equals(localDataCenter)) + localReplicas.add(replica); + + if (localReplicas.isEmpty()) + { + // If the consistency required is local then we should not involve other DCs + if (cl.isDatacenterLocal()) + throw UnavailableException.create(cl, cl.blockFor(replicationStrategy), 0); + + // No endpoint in local DC, pick the closest endpoint according to the snitch + replicas = snitch.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), replicas); + return replicas.get(0); + } + + return localReplicas.get(ThreadLocalRandom.current().nextInt(localReplicas.size())); } /** * A forwarding counter write is always sent to a single owning coordinator for the range, by the original coordinator * (if it is not itself an owner) */ - public static ReplicaPlan.ForWrite forForwardingCounterWrite(Keyspace keyspace, Token token, Replica replica) + public static ReplicaPlan.ForWrite forForwardingCounterWrite(ClusterMetadata metadata, Keyspace keyspace, Token token, Function<ClusterMetadata, Replica> replica) { - return forSingleReplicaWrite(keyspace, token, replica); + return forSingleReplicaWrite(metadata, keyspace, token, replica); } public static ReplicaPlan.ForWrite forLocalBatchlogWrite() { Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); - Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + Keyspace systemKeyspace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); Replica localSystemReplica = SystemReplicas.getSystemReplica(FBUtilities.getBroadcastAddressAndPort()); ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - systemKeypsace.getReplicationStrategy(), + systemKeyspace.getReplicationStrategy(), EndpointsForToken.of(token, localSystemReplica), EndpointsForToken.empty(token) ); - return forWrite(systemKeypsace, ConsistencyLevel.ONE, liveAndDown, liveAndDown, writeAll); + return forWrite(systemKeyspace, ConsistencyLevel.ONE, (cm) -> liveAndDown, (cm) -> true, writeAll); } /** @@ -213,12 +261,13 @@ public class ReplicaPlans */ public static ReplicaPlan.ForWrite forBatchlogWrite(boolean isAny) throws UnavailableException { - // A single case we write not for range or token, but multiple mutations to many tokens - Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + return forBatchlogWrite(ClusterMetadata.current(), isAny); + } - TokenMetadata.Topology topology = StorageService.instance.getTokenMetadata().cachedOnlyTokenMap().getTopology(); + private static ReplicaLayout.ForTokenWrite liveAndDownForBatchlogWrite(Token token, ClusterMetadata metadata, boolean isAny) + { IEndpointSnitch snitch = DatabaseDescriptor.getEndpointSnitch(); - Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(topology.getDatacenterRacks() + Multimap<String, InetAddressAndPort> localEndpoints = HashMultimap.create(metadata.directory.allDatacenterRacks() .get(snitch.getLocalDatacenter())); // Replicas are picked manually: // - replicas should be alive according to the failure detector @@ -230,16 +279,32 @@ public class ReplicaPlans if (chosenEndpoints.isEmpty() && isAny) chosenEndpoints = Collections.singleton(FBUtilities.getBroadcastAddressAndPort()); - Keyspace systemKeypsace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); - ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWrite( - systemKeypsace.getReplicationStrategy(), - SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), - EndpointsForToken.empty(token) - ); + return ReplicaLayout.forTokenWrite(Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME).getReplicationStrategy(), + SystemReplicas.getSystemReplicas(chosenEndpoints).forToken(token), + EndpointsForToken.empty(token)); + } + public static ReplicaPlan.ForWrite forBatchlogWrite(ClusterMetadata metadata, boolean isAny) throws UnavailableException + { + // A single case we write not for range or token, but multiple mutations to many tokens + Token token = DatabaseDescriptor.getPartitioner().getMinimumToken(); + Keyspace systemKeyspace = Keyspace.open(SchemaConstants.SYSTEM_KEYSPACE_NAME); + + ReplicaLayout.ForTokenWrite liveAndDown = liveAndDownForBatchlogWrite(token, metadata, isAny); // Batchlog is hosted by either one node or two nodes from different racks. ConsistencyLevel consistencyLevel = liveAndDown.all().size() == 1 ? ConsistencyLevel.ONE : ConsistencyLevel.TWO; - // assume that we have already been given live endpoints, and skip applying the failure detector - return forWrite(systemKeypsace, consistencyLevel, liveAndDown, liveAndDown, writeAll); + + AbstractReplicationStrategy replicationStrategy = liveAndDown.replicationStrategy(); + EndpointsForToken contacts = writeAll.select(consistencyLevel, liveAndDown, liveAndDown); + assureSufficientLiveReplicasForWrite(replicationStrategy, consistencyLevel, liveAndDown.all(), liveAndDown.pending()); + return new ReplicaPlan.ForWrite(systemKeyspace, + replicationStrategy, + consistencyLevel, + liveAndDown.pending(), + liveAndDown.all(), + liveAndDown.all().filter(FailureDetector.isReplicaAlive), + contacts, + (newClusterMetadata) -> forBatchlogWrite(newClusterMetadata, isAny), + metadata.epoch); } private static Collection<InetAddressAndPort> filterBatchlogEndpoints(String localRack, @@ -317,41 +382,80 @@ public class ReplicaPlans return result; } - public static ReplicaPlan.ForWrite forReadRepair(Token token, ReplicaPlan<?, ?> readPlan) throws UnavailableException + public static ReplicaPlan.ForReadRepair forReadRepair(ClusterMetadata metadata, Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Predicate<Replica> isAlive) throws UnavailableException { - return forWrite(readPlan.keyspace(), readPlan.consistencyLevel(), token, writeReadRepair(readPlan)); + AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); + ReplicaLayout.ForTokenRead forTokenRead = ReplicaLayout.forTokenReadLiveSorted(metadata, keyspace, replicationStrategy, token); + EndpointsForToken candidates = candidatesForRead(consistencyLevel, forTokenRead.natural()); + EndpointsForToken contacts = contactForRead(replicationStrategy, consistencyLevel, true, candidates); + + Selector selector = writeReadRepair(contacts); + + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(metadata, keyspace, token); + ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive); + + contacts = selector.select(consistencyLevel, liveAndDown, live); + assureSufficientLiveReplicasForWrite(replicationStrategy, consistencyLevel, live.all(), liveAndDown.pending()); + return new ReplicaPlan.ForReadRepair(keyspace, + replicationStrategy, + consistencyLevel, + liveAndDown.pending(), + liveAndDown.all(), + live.all(), + contacts, + (newClusterMetadata) -> forReadRepair(newClusterMetadata, keyspace, consistencyLevel, token, isAlive), + live.epoch()); } public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException { - return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWriteLiveAndDown(keyspace, token), selector); + return forWrite(ClusterMetadata.current(), keyspace, consistencyLevel, token, selector); + } + + public static ReplicaPlan.ForWrite forWrite(ClusterMetadata metadata, Keyspace keyspace, ConsistencyLevel consistencyLevel, Token token, Selector selector) throws UnavailableException + { + return forWrite(metadata, keyspace, consistencyLevel, (newClusterMetadata) -> ReplicaLayout.forTokenWriteLiveAndDown(newClusterMetadata, keyspace, token), selector); } @VisibleForTesting - public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, EndpointsForToken natural, EndpointsForToken pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException + public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Function<ClusterMetadata, EndpointsForToken> natural, Function<ClusterMetadata, EndpointsForToken> pending, Predicate<Replica> isAlive, Selector selector) throws UnavailableException { - return forWrite(keyspace, consistencyLevel, ReplicaLayout.forTokenWrite(keyspace.getReplicationStrategy(), natural, pending), isAlive, selector); + return forWrite(keyspace, consistencyLevel, (newClusterMetadata) -> ReplicaLayout.forTokenWrite(keyspace.getReplicationStrategy(), natural.apply(newClusterMetadata), pending.apply(newClusterMetadata)), isAlive, selector); } - public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Selector selector) throws UnavailableException + public static ReplicaPlan.ForWrite forWrite(ClusterMetadata metadata, Keyspace keyspace, ConsistencyLevel consistencyLevel, Function<ClusterMetadata, ReplicaLayout.ForTokenWrite> liveAndDown, Selector selector) throws UnavailableException { - return forWrite(keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector); + return forWrite(metadata, keyspace, consistencyLevel, liveAndDown, FailureDetector.isReplicaAlive, selector); } - private static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, Predicate<Replica> isAlive, Selector selector) throws UnavailableException + public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, Function<ClusterMetadata, ReplicaLayout.ForTokenWrite> liveAndDownSupplier, Predicate<Replica> isAlive, Selector selector) throws UnavailableException { - ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive); - return forWrite(keyspace, consistencyLevel, liveAndDown, live, selector); + return forWrite(ClusterMetadata.current(), keyspace, consistencyLevel, liveAndDownSupplier, isAlive, selector); } - public static ReplicaPlan.ForWrite forWrite(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaLayout.ForTokenWrite liveAndDown, ReplicaLayout.ForTokenWrite live, Selector selector) throws UnavailableException + public static ReplicaPlan.ForWrite forWrite(ClusterMetadata metadata, + Keyspace keyspace, + ConsistencyLevel consistencyLevel, + Function<ClusterMetadata, ReplicaLayout.ForTokenWrite> liveAndDownSupplier, + Predicate<Replica> isAlive, + Selector selector) throws UnavailableException { - assert liveAndDown.replicationStrategy() == live.replicationStrategy() - : "ReplicaLayout liveAndDown and live should be derived from the same replication strategy."; + ReplicaLayout.ForTokenWrite liveAndDown = liveAndDownSupplier.apply(metadata); + ReplicaLayout.ForTokenWrite live = liveAndDown.filter(isAlive); + AbstractReplicationStrategy replicationStrategy = liveAndDown.replicationStrategy(); EndpointsForToken contacts = selector.select(consistencyLevel, liveAndDown, live); assureSufficientLiveReplicasForWrite(replicationStrategy, consistencyLevel, live.all(), liveAndDown.pending()); - return new ReplicaPlan.ForWrite(keyspace, replicationStrategy, consistencyLevel, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts); + + return new ReplicaPlan.ForWrite(keyspace, + replicationStrategy, + consistencyLevel, + liveAndDown.pending(), + liveAndDown.all(), + live.all(), + contacts, + (newClusterMetadata) -> forWrite(newClusterMetadata, keyspace, consistencyLevel, liveAndDownSupplier, isAlive, selector), + live.epoch()); } public interface Selector @@ -434,7 +538,7 @@ public class ReplicaPlans * the minimal number of nodes to meet the consistency level, and prefer nodes we contacted on read to minimise * data transfer. */ - public static Selector writeReadRepair(ReplicaPlan<?, ?> readPlan) + public static Selector writeReadRepair(EndpointsForToken originalContacts) { return new Selector() { @@ -446,7 +550,7 @@ public class ReplicaPlans ReplicaCollection.Builder<E> contacts = live.all().newBuilder(live.all().size()); // add all live nodes we might write to that we have already contacted on read - contacts.addAll(filter(live.all(), r -> readPlan.contacts().endpoints().contains(r.endpoint()))); + contacts.addAll(filter(live.all(), r -> originalContacts.endpoints().contains(r.endpoint()))); // finally, add sufficient nodes to achieve our consistency level if (consistencyLevel != EACH_QUORUM) @@ -486,10 +590,15 @@ public class ReplicaPlans * This will select all live nodes as the candidates for the operation. Only the required number of participants */ public static ReplicaPlan.ForPaxosWrite forPaxos(Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos) throws UnavailableException + { + return forPaxos(ClusterMetadata.current(), keyspace, key, consistencyForPaxos, true); + } + + public static ReplicaPlan.ForPaxosWrite forPaxos(ClusterMetadata metadata, Keyspace keyspace, DecoratedKey key, ConsistencyLevel consistencyForPaxos, boolean throwOnInsufficientLiveReplicas) throws UnavailableException { Token tk = key.getToken(); - ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(keyspace, tk); + ReplicaLayout.ForTokenWrite liveAndDown = ReplicaLayout.forTokenWriteLiveAndDown(metadata, keyspace, tk); Replicas.temporaryAssertFull(liveAndDown.all()); // TODO CASSANDRA-14547 @@ -506,23 +615,33 @@ public class ReplicaPlans int participants = liveAndDown.all().size(); int requiredParticipants = participants / 2 + 1; // See CASSANDRA-8346, CASSANDRA-833 - EndpointsForToken contacts = live.all(); - if (contacts.size() < requiredParticipants) - throw UnavailableException.create(consistencyForPaxos, requiredParticipants, contacts.size()); - - // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. - // Note that we fake an impossible number of required nodes in the unavailable exception - // to nail home the point that it's an impossible operation no matter how many nodes are live. - if (liveAndDown.pending().size() > 1) - throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()), - consistencyForPaxos, - participants + 1, - contacts.size()); + if (throwOnInsufficientLiveReplicas) + { + if (live.all().size() < requiredParticipants) + throw UnavailableException.create(consistencyForPaxos, requiredParticipants, live.all().size()); + + // We cannot allow CAS operations with 2 or more pending endpoints, see #8346. + // Note that we fake an impossible number of required nodes in the unavailable exception + // to nail home the point that it's an impossible operation no matter how many nodes are live. + if (liveAndDown.pending().size() > 1) + throw new UnavailableException(String.format("Cannot perform LWT operation as there is more than one (%d) pending range movement", liveAndDown.all().size()), + consistencyForPaxos, + participants + 1, + live.all().size()); + } - return new ReplicaPlan.ForPaxosWrite(keyspace, consistencyForPaxos, liveAndDown.pending(), liveAndDown.all(), live.all(), contacts, requiredParticipants); + assert live.epoch().equals(liveAndDown.epoch()); + return new ReplicaPlan.ForPaxosWrite(keyspace, + consistencyForPaxos, + liveAndDown.pending(), + liveAndDown.all(), + live.all(), + live.all(), + requiredParticipants, + (newClusterMetadata) -> forPaxos(newClusterMetadata, keyspace, key, consistencyForPaxos, false), + live.epoch()); } - private static <E extends Endpoints<E>> E candidatesForRead(ConsistencyLevel consistencyLevel, E liveNaturalReplicas) { return consistencyLevel.isDatacenterLocal() @@ -565,8 +684,23 @@ public class ReplicaPlans */ public static ReplicaPlan.ForTokenRead forSingleReplicaRead(Keyspace keyspace, Token token, Replica replica) { + return forSingleReplicaRead(ClusterMetadata.current(), keyspace, token, replica); + } + + private static ReplicaPlan.ForTokenRead forSingleReplicaRead(ClusterMetadata metadata, Keyspace keyspace, Token token, Replica replica) + { + // todo; replica does not always contain token, figure out why +// if (!metadata.placements.get(keyspace.getMetadata().params.replication).reads.forToken(token).contains(replica)) +// throw UnavailableException.create(ConsistencyLevel.ONE, 1, 1, 0, 0); + EndpointsForToken one = EndpointsForToken.of(token, replica); - return new ReplicaPlan.ForTokenRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one); + + return new ReplicaPlan.ForTokenRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, one, one, + (newClusterMetadata) -> forSingleReplicaRead(newClusterMetadata, keyspace, token, replica), + () -> { + throw new IllegalStateException("Read repair is not supported for short read/replica filtering protection."); + }, + metadata.epoch); } /** @@ -574,9 +708,25 @@ public class ReplicaPlans */ public static ReplicaPlan.ForRangeRead forSingleReplicaRead(Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica, int vnodeCount) { + return forSingleReplicaRead(ClusterMetadata.current(), keyspace, range, replica, vnodeCount); + } + + private static ReplicaPlan.ForRangeRead forSingleReplicaRead(ClusterMetadata metadata, Keyspace keyspace, AbstractBounds<PartitionPosition> range, Replica replica, int vnodeCount) + { + ReplicaLayout.ForRangeRead forRangeRead = ReplicaLayout.forRangeReadLiveSorted(metadata, keyspace, keyspace.getReplicationStrategy(), range); + + if (!forRangeRead.all().contains(replica)) + throw UnavailableException.create(ConsistencyLevel.ONE, 1, 1, forRangeRead.all().size(), forRangeRead.all().size()); + // TODO: this is unsafe, as one.range() may be inconsistent with our supplied range; should refactor Range/AbstractBounds to single class EndpointsForRange one = EndpointsForRange.of(replica); - return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, vnodeCount); + + return new ReplicaPlan.ForRangeRead(keyspace, keyspace.getReplicationStrategy(), ConsistencyLevel.ONE, range, one, one, vnodeCount, + (newClusterMetadata) -> forSingleReplicaRead(metadata, keyspace, range, replica, vnodeCount), + (token) -> { + throw new IllegalStateException("Read repair is not supported for short read/replica filtering protection."); + }, + metadata.epoch); } /** @@ -588,13 +738,29 @@ public class ReplicaPlans * it would break EACH_QUORUM to do so without further filtering */ public static ReplicaPlan.ForTokenRead forRead(Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) + { + return forRead(ClusterMetadata.current(), keyspace, token, consistencyLevel, retry, false); + } + + public static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata, Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry) + { + return forRead(metadata, keyspace, token, consistencyLevel, retry, true); + } + + private static ReplicaPlan.ForTokenRead forRead(ClusterMetadata metadata, Keyspace keyspace, Token token, ConsistencyLevel consistencyLevel, SpeculativeRetryPolicy retry, boolean throwOnInsufficientLiveReplicas) { AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); - EndpointsForToken candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forTokenReadLiveSorted(replicationStrategy, token).natural()); + ReplicaLayout.ForTokenRead forTokenRead = ReplicaLayout.forTokenReadLiveSorted(metadata, keyspace, replicationStrategy, token); + EndpointsForToken candidates = candidatesForRead(consistencyLevel, forTokenRead.natural()); EndpointsForToken contacts = contactForRead(replicationStrategy, consistencyLevel, retry.equals(AlwaysSpeculativeRetryPolicy.INSTANCE), candidates); - assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts); - return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, contacts); + if (throwOnInsufficientLiveReplicas) + assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts); + + return new ReplicaPlan.ForTokenRead(keyspace, replicationStrategy, consistencyLevel, candidates, contacts, + (newClusterMetadata) -> forRead(newClusterMetadata, keyspace, token, consistencyLevel, retry, false), + () -> forReadRepair(metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive), + metadata.epoch); } /** @@ -605,13 +771,30 @@ public class ReplicaPlans * There is no speculation for range read queries at present, so we never 'always speculate' here, and a failed response fails the query. */ public static ReplicaPlan.ForRangeRead forRangeRead(Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int vnodeCount) + { + return forRangeRead(ClusterMetadata.current(), keyspace, consistencyLevel, range, vnodeCount, true); + } + + public static ReplicaPlan.ForRangeRead forRangeRead(ClusterMetadata metadata, Keyspace keyspace, ConsistencyLevel consistencyLevel, AbstractBounds<PartitionPosition> range, int vnodeCount, boolean throwOnInsufficientLiveReplicas) { AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); - EndpointsForRange candidates = candidatesForRead(consistencyLevel, ReplicaLayout.forRangeReadLiveSorted(replicationStrategy, range).natural()); + ReplicaLayout.ForRangeRead forRangeRead = ReplicaLayout.forRangeReadLiveSorted(metadata, keyspace, replicationStrategy, range); + EndpointsForRange candidates = candidatesForRead(consistencyLevel, forRangeRead.natural()); EndpointsForRange contacts = contactForRead(replicationStrategy, consistencyLevel, false, candidates); - assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts); - return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, consistencyLevel, range, candidates, contacts, vnodeCount); + if (throwOnInsufficientLiveReplicas) + assureSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, contacts); + + return new ReplicaPlan.ForRangeRead(keyspace, + replicationStrategy, + consistencyLevel, + range, + candidates, + contacts, + vnodeCount, + (newClusterMetadata) -> forRangeRead(newClusterMetadata, keyspace, consistencyLevel, range, vnodeCount, false), + (token) -> forReadRepair(metadata, keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive), + forRangeRead.epoch()); } /** @@ -619,22 +802,41 @@ public class ReplicaPlans */ public static ReplicaPlan.ForRangeRead maybeMerge(Keyspace keyspace, ConsistencyLevel consistencyLevel, ReplicaPlan.ForRangeRead left, ReplicaPlan.ForRangeRead right) { - // TODO: should we be asserting that the ranges are adjacent? - AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right); - EndpointsForRange mergedCandidates = left.readCandidates().keep(right.readCandidates().endpoints()); - AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); + assert left.range.right.equals(right.range.left); - // Check if there are enough shared endpoints for the merge to be possible. - if (!isSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, mergedCandidates)) + if (!left.epoch.equals(right.epoch)) return null; + EndpointsForRange mergedCandidates = left.readCandidates().keep(right.readCandidates().endpoints()); + AbstractReplicationStrategy replicationStrategy = keyspace.getReplicationStrategy(); EndpointsForRange contacts = contactForRead(replicationStrategy, consistencyLevel, false, mergedCandidates); // Estimate whether merging will be a win or not if (!DatabaseDescriptor.getEndpointSnitch().isWorthMergingForRangeQuery(contacts, left.contacts(), right.contacts())) return null; + AbstractBounds<PartitionPosition> newRange = left.range().withNewRight(right.range().right); + + // Check if there are enough shared endpoints for the merge to be possible. + if (!isSufficientLiveReplicasForRead(replicationStrategy, consistencyLevel, mergedCandidates)) + return null; + + int newVnodeCount = left.vnodeCount() + right.vnodeCount(); + // If we get there, merge this range and the next one - return new ReplicaPlan.ForRangeRead(keyspace, replicationStrategy, consistencyLevel, newRange, mergedCandidates, contacts, left.vnodeCount() + right.vnodeCount()); + return new ReplicaPlan.ForRangeRead(keyspace, + replicationStrategy, + consistencyLevel, + newRange, + mergedCandidates, + contacts, + newVnodeCount, + (newClusterMetadata) -> forRangeRead(newClusterMetadata, keyspace, consistencyLevel, newRange, newVnodeCount, false), + (token) -> { + // It might happen that the ring has moved forward since the operation has started, but because we'll be recomputing a quorum + // after the operation is complete, we will catch inconsistencies either way. + return forReadRepair(ClusterMetadata.current(), keyspace, consistencyLevel, token, FailureDetector.isReplicaAlive); + }, + left.epoch); } } diff --git a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java index 1cee468cd3..27235888ad 100644 --- a/src/java/org/apache/cassandra/net/ResponseVerbHandler.java +++ b/src/java/org/apache/cassandra/net/ResponseVerbHandler.java @@ -20,7 +20,9 @@ package org.apache.cassandra.net; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.cassandra.concurrent.Stage; import org.apache.cassandra.exceptions.RequestFailureReason; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tracing.Tracing; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -35,6 +37,18 @@ class ResponseVerbHandler implements IVerbHandler @Override public void doVerb(Message message) { + if (message.verb() != Verb.TCM_REPLAY_RSP && + message.verb() != Verb.TCM_COMMIT_RSP && + message.verb() != Verb.TCM_REPLICATION && + message.verb() != Verb.TCM_NOTIFY_RSP && + message.verb() != Verb.TCM_DISCOVER_RSP && + // Gossip stage is single-threaded, so we may end up in a deadlock with after-commit hook + // that executes something on the gossip stage as well. + !Stage.GOSSIP.executor().inExecutor()) + { + ClusterMetadataService.instance().maybeCatchup(message.epoch()); + } + RequestCallbacks.CallbackInfo callbackInfo = MessagingService.instance().callbacks.remove(message.id(), message.from()); if (callbackInfo == null) { diff --git a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java index d2c54f9f8f..62d57f5ca6 100644 --- a/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/AbstractWriteResponseHandler.java @@ -34,6 +34,7 @@ import org.apache.cassandra.db.Mutation; import org.apache.cassandra.locator.EndpointsForToken; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlan.ForWrite; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.concurrent.Condition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -138,6 +139,9 @@ public abstract class AbstractWriteResponseHandler<T> implements RequestCallback throw new WriteFailureException(replicaPlan.consistencyLevel(), ackCount(), blockFor(), writeType, this.failureReasonByEndpoint); } + + if (replicaPlan.stillAppliesTo(ClusterMetadata.current())) + return; } private void throwTimeout() diff --git a/src/java/org/apache/cassandra/service/WriteResponseHandler.java b/src/java/org/apache/cassandra/service/WriteResponseHandler.java index 6fe9e527cd..c2c26e6a9a 100644 --- a/src/java/org/apache/cassandra/service/WriteResponseHandler.java +++ b/src/java/org/apache/cassandra/service/WriteResponseHandler.java @@ -27,6 +27,8 @@ import org.slf4j.LoggerFactory; import org.apache.cassandra.net.Message; import org.apache.cassandra.db.WriteType; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.utils.FBUtilities; /** * Handles blocking writes for ONE, ANY, TWO, THREE, QUORUM, and ALL consistency levels. @@ -56,6 +58,16 @@ public class WriteResponseHandler<T> extends AbstractWriteResponseHandler<T> public void onResponse(Message<T> m) { + if (m == null) + { + replicaPlan.collectSuccess(FBUtilities.getBroadcastAddressAndPort()); + } + else + { + ClusterMetadataService.instance().maybeCatchup(m.epoch()); + replicaPlan.collectSuccess(m.from()); + } + if (responsesUpdater.decrementAndGet(this) == 0) signal(); //Must be last after all subclass processing diff --git a/src/java/org/apache/cassandra/service/paxos/Paxos.java b/src/java/org/apache/cassandra/service/paxos/Paxos.java index 784974e874..f105918356 100644 --- a/src/java/org/apache/cassandra/service/paxos/Paxos.java +++ b/src/java/org/apache/cassandra/service/paxos/Paxos.java @@ -49,6 +49,7 @@ import org.apache.cassandra.locator.ReplicaLayout; import org.apache.cassandra.locator.ReplicaLayout.ForTokenWrite; import org.apache.cassandra.locator.ReplicaPlan.ForRead; import org.apache.cassandra.metrics.ClientRequestSizeMetrics; +import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.config.Config; import org.apache.cassandra.config.DatabaseDescriptor; @@ -85,6 +86,7 @@ import org.apache.cassandra.service.CASRequest; import org.apache.cassandra.service.ClientState; import org.apache.cassandra.service.FailureRecordingCallback.AsMap; import org.apache.cassandra.service.paxos.Commit.Proposal; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.repair.NoopReadRepair; import org.apache.cassandra.service.paxos.cleanup.PaxosTableRepairs; @@ -382,16 +384,27 @@ public class Paxos return electorateNatural; } - static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + public boolean stillAppliesTo(ClusterMetadata metadata) + { + // TODO: currently, Paxos consistency verification is being done via Participants class. + // Since there is already a consistency check that is existing and available there, we postpone this until later. + return true; + } + + static Participants get(ClusterMetadata metadata, TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) { - Keyspace keyspace = Keyspace.open(table.keyspace); - ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspace, token); + KeyspaceMetadata keyspaceMetadata = metadata.schema.getKeyspaceMetadata(table.keyspace); + ReplicaLayout.ForTokenWrite all = forTokenWriteLiveAndDown(keyspaceMetadata, token); ReplicaLayout.ForTokenWrite electorate = consistencyForConsensus.isDatacenterLocal() ? all.filter(InOurDc.replicas()) : all; EndpointsForToken live = all.all().filter(FailureDetector.isReplicaAlive); + return new Participants(Keyspace.open(table.keyspace), consistencyForConsensus, all, electorate, live); + } - return new Participants(keyspace, consistencyForConsensus, all, electorate, live); + static Participants get(TableMetadata table, Token token, ConsistencyLevel consistencyForConsensus) + { + return get(ClusterMetadata.current(), table, token, consistencyForConsensus); } static Participants get(TableMetadata cfm, DecoratedKey key, ConsistencyLevel consistency) @@ -414,7 +427,7 @@ public class Paxos if (sizeOfConsensusQuorum > sizeOfPoll()) { mark(isWrite, m -> m.unavailables, consistencyForConsensus); - throw new UnavailableException("Cannot achieve consistency level " + consistencyForConsensus, consistencyForConsensus, sizeOfConsensusQuorum, sizeOfPoll()); + throw new UnavailableException("Cannot achieve consistency level " + consistencyForConsensus + " " + sizeOfConsensusQuorum + " > " + sizeOfPoll(), consistencyForConsensus, sizeOfConsensusQuorum, sizeOfPoll()); } } @@ -480,6 +493,16 @@ public class Paxos { throw new UnsupportedOperationException(); } + + public void collectSuccess(InetAddressAndPort inetAddressAndPort) + { + // TODO + } + + public void collectFailure(InetAddressAndPort inetAddressAndPort, RequestFailureReason t) + { + // TODO + } } /** @@ -1012,7 +1035,6 @@ public class Paxos // we don't need to test the side effects, as we just want to start again, and fall through // to the superseded section below prepare = new PaxosPrepare.Superseded(proposeResult.superseded().by, inProgress.participants); - } } diff --git a/src/java/org/apache/cassandra/service/reads/ReadCallback.java b/src/java/org/apache/cassandra/service/reads/ReadCallback.java index c25b1f0f02..b40b10150a 100644 --- a/src/java/org/apache/cassandra/service/reads/ReadCallback.java +++ b/src/java/org/apache/cassandra/service/reads/ReadCallback.java @@ -27,6 +27,8 @@ import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.MessageParams; import org.apache.cassandra.locator.ReplicaPlan; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.utils.concurrent.Condition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,7 +65,6 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< public final ResponseResolver<E, P> resolver; final Condition condition = newOneTimeCondition(); private final long queryStartNanoTime; - final int blockFor; // TODO: move to replica plan as well? // this uses a plain reference, but is initialised before handoff to any other threads; the later updates // may not be visible to the threads immediately, but ReplicaPlan only contains final fields, so they will never see an uninitialised object final ReplicaPlan.Shared<E, P> replicaPlan; @@ -82,13 +83,12 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< this.resolver = resolver; this.queryStartNanoTime = queryStartNanoTime; this.replicaPlan = replicaPlan; - this.blockFor = replicaPlan.get().readQuorum(); this.failureReasonByEndpoint = new ConcurrentHashMap<>(); // we don't support read repair (or rapid read protection) for range scans yet (CASSANDRA-6897) - assert !(command instanceof PartitionRangeReadCommand) || blockFor >= replicaPlan().contacts().size(); + assert !(command instanceof PartitionRangeReadCommand) || replicaPlan().readQuorum() >= replicaPlan().contacts().size(); if (logger.isTraceEnabled()) - logger.trace("Blockfor is {}; setting up requests to {}", blockFor, this.replicaPlan); + logger.trace("Blockfor is {}; setting up requests to {}", replicaPlan().readQuorum(), this.replicaPlan); } protected P replicaPlan() @@ -120,7 +120,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< * CASSANDRA-16097 */ int received = resolver.responses.size(); - boolean failed = failures > 0 && (blockFor > received || !resolver.isDataPresent()); + boolean failed = failures > 0 && (replicaPlan().readQuorum() > received || !resolver.isDataPresent()); // If all messages came back as a TIMEOUT then signaled=true and failed=true. // Need to distinguish between a timeout and a failure (network, bad data, etc.), so store an extra field. // see CASSANDRA-17828 @@ -139,32 +139,28 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< if (!snapshot.isEmpty()) CoordinatorWarnings.update(command, snapshot); } - if (signaled && !failed) + + if (signaled && !failed && replicaPlan().stillAppliesTo(ClusterMetadata.current())) return; if (isTracing()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - Tracing.trace("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, blockFor, gotData); + Tracing.trace("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData); } else if (logger.isDebugEnabled()) { String gotData = received > 0 ? (resolver.isDataPresent() ? " (including data)" : " (only digests)") : ""; - logger.debug("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, blockFor, gotData); + logger.debug("{}; received {} of {} responses{}", !timedout ? "Failed" : "Timed out", received, replicaPlan().readQuorum(), gotData); } if (snapshot != null) - snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint); + snapshot.maybeAbort(command, replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint); // Same as for writes, see AbstractWriteResponseHandler throw !timedout - ? new ReadFailureException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent(), failureReasonByEndpoint) - : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, resolver.isDataPresent()); - } - - public int blockFor() - { - return blockFor; + ? new ReadFailureException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent(), failureReasonByEndpoint) + : new ReadTimeoutException(replicaPlan().consistencyLevel(), received, replicaPlan().readQuorum(), resolver.isDataPresent()); } @Override @@ -176,6 +172,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< if (WarningContext.isSupported(params.keySet())) { RequestFailureReason reason = getWarningContext().updateCounters(params, from); + replicaPlan().collectFailure(message.from(), reason); if (reason != null) { onFailure(message.from(), reason); @@ -183,6 +180,8 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< } } resolver.preprocess(message); + ClusterMetadataService.instance().maybeCatchup(message.epoch()); + replicaPlan().collectSuccess(message.from()); /* * Ensure that data is present and the response accumulator has properly published the @@ -190,7 +189,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< * the minimum number of required results, but it guarantees at least the minimum will * be accessible when we do signal. (see CASSANDRA-16807) */ - if (resolver.isDataPresent() && resolver.responses.size() >= blockFor) + if (resolver.isDataPresent() && resolver.responses.size() >= replicaPlan().readQuorum()) condition.signalAll(); } @@ -229,7 +228,7 @@ public class ReadCallback<E extends Endpoints<E>, P extends ReplicaPlan.ForRead< failureReasonByEndpoint.put(from, failureReason); - if (blockFor + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size()) + if (replicaPlan().readQuorum() + failuresUpdater.incrementAndGet(this) > replicaPlan().contacts().size()) condition.signalAll(); } diff --git a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java index 889fa79a50..f3a88181a3 100644 --- a/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java +++ b/src/java/org/apache/cassandra/service/reads/ReplicaFilteringProtection.java @@ -529,11 +529,10 @@ public class ReplicaFilteringProtection<E extends Endpoints<E>> filter); ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forSingleReplicaRead(keyspace, key.getToken(), source); - ReplicaPlan.SharedForTokenRead sharedReplicaPlan = ReplicaPlan.shared(replicaPlan); try { - return executeReadCommand(cmd, source, sharedReplicaPlan); + return executeReadCommand(cmd, source, ReplicaPlan.shared(replicaPlan)); } catch (ReadTimeoutException e) { diff --git a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java index 02e565d536..8e6b6d803d 100644 --- a/src/java/org/apache/cassandra/service/reads/ResponseResolver.java +++ b/src/java/org/apache/cassandra/service/reads/ResponseResolver.java @@ -34,7 +34,6 @@ public abstract class ResponseResolver<E extends Endpoints<E>, P extends Replica protected static final Logger logger = LoggerFactory.getLogger(ResponseResolver.class); protected final ReadCommand command; - // TODO: this doesn't need to be a full ReplicaPlan; just a replica collection protected final Supplier<? extends P> replicaPlan; // Accumulator gives us non-blocking thread-safety with optimal algorithmic constraints diff --git a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java index 605c4561b9..8093dacb0c 100644 --- a/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java +++ b/src/java/org/apache/cassandra/service/reads/range/ReplicaPlanIterator.java @@ -34,8 +34,7 @@ import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.LocalStrategy; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.locator.ReplicaPlans; -import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.compatibility.TokenRingUtils; import org.apache.cassandra.utils.AbstractIterator; import org.apache.cassandra.utils.Pair; @@ -89,11 +88,11 @@ class ReplicaPlanIterator extends AbstractIterator<ReplicaPlan.ForRangeRead> return Collections.singletonList(queryRange); } - TokenMetadata tokenMetadata = StorageService.instance.getTokenMetadata(); + ClusterMetadata metadata = ClusterMetadata.current(); List<AbstractBounds<PartitionPosition>> ranges = new ArrayList<>(); // divide the queryRange into pieces delimited by the ring and minimum tokens - Iterator<Token> ringIter = TokenRingUtils.ringIterator(tokenMetadata.sortedTokens(), queryRange.left.getToken(), true); + Iterator<Token> ringIter = TokenRingUtils.ringIterator(metadata.tokenMap.tokens(), queryRange.left.getToken(), true); AbstractBounds<PartitionPosition> remainder = queryRange; while (ringIter.hasNext()) { diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java index c8b0e29145..41c5e395bc 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingPartitionRepair.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.utils.concurrent.AsyncFuture; import org.apache.cassandra.utils.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -61,35 +62,30 @@ public class BlockingPartitionRepair extends AsyncFuture<Object> implements RequestCallback<Object> { private final DecoratedKey key; - private final ReplicaPlan.ForWrite writePlan; + private final ReplicaPlan.ForReadRepair repairPlan; private final Map<Replica, Mutation> pendingRepairs; private final CountDownLatch latch; private final Predicate<InetAddressAndPort> shouldBlockOn; - + private final int blockFor; private volatile long mutationsSentTime; - public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForWrite writePlan) + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForReadRepair repairPlan) { - this(key, repairs, writePlan, - writePlan.consistencyLevel().isDatacenterLocal() ? InOurDc.endpoints() : Predicates.alwaysTrue()); + this(key, repairs, repairPlan, + repairPlan.consistencyLevel().isDatacenterLocal() ? InOurDc.endpoints() : Predicates.alwaysTrue()); } - public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForWrite writePlan, Predicate<InetAddressAndPort> shouldBlockOn) + + @VisibleForTesting + public BlockingPartitionRepair(DecoratedKey key, Map<Replica, Mutation> repairs, ReplicaPlan.ForReadRepair repairPlan, Predicate<InetAddressAndPort> shouldBlockOn) { this.key = key; this.pendingRepairs = new ConcurrentHashMap<>(repairs); - this.writePlan = writePlan; + // Remove empty repair mutations from the block for total, since we're not sending them. + // Besides, remote dcs can sometimes get involved in dc-local reads. We want to repair them if they do, but we + // they shouldn't block for them. + this.repairPlan = repairPlan.skipBlockingFor((r) -> shouldBlockOn.test(r.endpoint()) && !repairs.containsKey(r)); this.shouldBlockOn = shouldBlockOn; - - int blockFor = writePlan.writeQuorum(); - // here we remove empty repair mutations from the block for total, since - // we're not sending them mutations - for (Replica participant : writePlan.contacts()) - { - // remote dcs can sometimes get involved in dc-local reads. We want to repair - // them if they do, but they shouldn't interfere with blocking the client read. - if (!repairs.containsKey(participant) && shouldBlockOn.test(participant.endpoint())) - blockFor--; - } + this.blockFor = this.repairPlan.writeQuorum(); // there are some cases where logically identical data can return different digests // For read repair, this would result in ReadRepairHandler being called with a map of @@ -99,15 +95,20 @@ public class BlockingPartitionRepair latch = newCountDownLatch(Math.max(blockFor, 0)); } + public ReplicaPlan.ForReadRepair repairPlan() + { + return repairPlan; + } + int blockFor() { - return writePlan.writeQuorum(); + return blockFor; } @VisibleForTesting int waitingOn() { - return (int) latch.count(); + return latch.count(); } @VisibleForTesting @@ -115,7 +116,7 @@ public class BlockingPartitionRepair { if (shouldBlockOn.test(from)) { - pendingRepairs.remove(writePlan.lookup(from)); + pendingRepairs.remove(repairPlan.lookup(from)); latch.decrement(); } } @@ -123,6 +124,8 @@ public class BlockingPartitionRepair @Override public void onResponse(Message<Object> msg) { + ClusterMetadataService.instance().maybeCatchup(msg.epoch()); + repairPlan.collectSuccess(msg.from()); ack(msg.from()); } @@ -166,6 +169,7 @@ public class BlockingPartitionRepair if (!shouldBlockOn.test(destination.endpoint())) pendingRepairs.remove(destination); + ReadRepairDiagnostics.sendInitialRepair(this, destination.endpoint(), mutation); } } @@ -208,7 +212,7 @@ public class BlockingPartitionRepair if (awaitRepairsUntil(timeout + timeoutUnit.convert(mutationsSentTime, TimeUnit.NANOSECONDS), timeoutUnit)) return; - EndpointsForToken newCandidates = writePlan.liveUncontacted(); + EndpointsForToken newCandidates = repairPlan.liveUncontacted(); if (newCandidates.isEmpty()) return; @@ -230,7 +234,7 @@ public class BlockingPartitionRepair if (mutation == null) { - mutation = BlockingReadRepairs.createRepairMutation(update, writePlan.consistencyLevel(), replica.endpoint(), true); + mutation = BlockingReadRepairs.createRepairMutation(update, repairPlan.consistencyLevel(), replica.endpoint(), true); versionedMutations[versionIdx] = mutation; } @@ -249,7 +253,7 @@ public class BlockingPartitionRepair Keyspace getKeyspace() { - return writePlan.keyspace(); + return repairPlan.keyspace(); } DecoratedKey getKey() @@ -259,6 +263,6 @@ public class BlockingPartitionRepair ConsistencyLevel getConsistency() { - return writePlan.consistencyLevel(); + return repairPlan.consistencyLevel(); } } diff --git a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java index 9143a475fe..5877f1c666 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/BlockingReadRepair.java @@ -36,6 +36,7 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; import org.apache.cassandra.metrics.ReadRepairMetrics; +import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tracing.Tracing; import static java.util.concurrent.TimeUnit.MICROSECONDS; @@ -82,6 +83,8 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo public void awaitWrites() { BlockingPartitionRepair timedOut = null; + ReplicaPlan.ForReadRepair repairPlan = null; + for (BlockingPartitionRepair repair : repairs) { if (!repair.awaitRepairsUntil(DatabaseDescriptor.getReadRpcTimeout(NANOSECONDS) + queryStartNanoTime, NANOSECONDS)) @@ -89,6 +92,7 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo timedOut = repair; break; } + repairPlan = repair.repairPlan(); } if (timedOut != null) { @@ -103,10 +107,13 @@ public class BlockingReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo throw new ReadTimeoutException(replicaPlan().consistencyLevel(), received, blockFor, true); } + + if (repairs.isEmpty() || repairPlan.stillAppliesTo(ClusterMetadata.current())) + return; } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForWrite writePlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForReadRepair writePlan) { BlockingPartitionRepair blockingRepair = new BlockingPartitionRepair(partitionKey, mutations, writePlan); blockingRepair.sendInitialRepairs(); diff --git a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java index 5cf72b33cf..45408a4fcd 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/NoopReadRepair.java @@ -75,7 +75,7 @@ public class NoopReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForWrite writePlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForReadRepair writePlan) { } diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java index 72a12980e7..67bd4de352 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadOnlyReadRepair.java @@ -61,7 +61,7 @@ public class ReadOnlyReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.Fo } @Override - public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForWrite writePlan) + public void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForReadRepair writePlan) { throw new UnsupportedOperationException("ReadOnlyReadRepair shouldn't be trying to repair partitions"); } diff --git a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java index a6a9be29f9..5358aaf214 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/ReadRepair.java @@ -92,5 +92,5 @@ public interface ReadRepair<E extends Endpoints<E>, P extends ReplicaPlan.ForRea * Repairs a partition _after_ receiving data responses. This method receives replica list, since * we will block repair only on the replicas that have responded. */ - void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForWrite writePlan); + void repairPartition(DecoratedKey partitionKey, Map<Replica, Mutation> mutations, ReplicaPlan.ForReadRepair writePlan); } diff --git a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java index 079080ab6f..1bfe936516 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java +++ b/src/java/org/apache/cassandra/service/reads/repair/RowIteratorMergeListener.java @@ -51,7 +51,6 @@ import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.locator.ReplicaPlan; -import org.apache.cassandra.locator.ReplicaPlans; import org.apache.cassandra.schema.ColumnMetadata; public class RowIteratorMergeListener<E extends Endpoints<E>> @@ -69,7 +68,7 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> private final Row.Builder[] currentRows; private final RowDiffListener diffListener; private final ReplicaPlan.ForRead<E, ?> readPlan; - private final ReplicaPlan.ForWrite writePlan; + private final ReplicaPlan.ForReadRepair repairPlan; // The partition level deletion for the merge row. private DeletionTime partitionLevelDeletion; @@ -88,7 +87,10 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> this.columns = columns; this.isReversed = isReversed; this.readPlan = readPlan; - this.writePlan = ReplicaPlans.forReadRepair(partitionKey.getToken(), readPlan); + if (readPlan instanceof ReplicaPlan.ForTokenRead) + this.repairPlan = ((ReplicaPlan.ForTokenRead)readPlan).repairPlan(); + else + this.repairPlan = ((ReplicaPlan.ForRangeRead)readPlan).repairPlan(partitionKey.getToken()); int size = readPlan.contacts().size(); this.writeBackTo = new BitSet(size); @@ -96,18 +98,18 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> int i = 0; for (Replica replica : readPlan.contacts()) { - if (writePlan.contacts().endpoints().contains(replica.endpoint())) + if (repairPlan.contacts().endpoints().contains(replica.endpoint())) writeBackTo.set(i); ++i; } } - // If we are contacting any nodes we didn't read from, we are likely handling a range movement. + // If we are contacting any nodes we didn't read from, we are handling a range movement (the likeliest scenario is a pending replica). // In this case we need to send all differences to these nodes, as we do not (with present design) know which // node they bootstrapped from, and so which data we need to duplicate. // In reality, there will be situations where we are simply sending the same number of writes to different nodes // and in this case we could probably avoid building a full difference, and only ensure each write makes it to // some other node, but it is probably not worth special casing this scenario. - this.buildFullDiff = Iterables.any(writePlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e)); + this.buildFullDiff = Iterables.any(repairPlan.contacts().endpoints(), e -> !readPlan.contacts().endpoints().contains(e)); this.repairs = new PartitionUpdate.Builder[size + (buildFullDiff ? 1 : 0)]; this.currentRows = new Row.Builder[size]; this.sourceDeletionTime = new DeletionTime[size]; @@ -376,12 +378,12 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> if (buildFullDiff && repairs[repairs.length - 1] != null) fullDiffRepair = repairs[repairs.length - 1].build(); - Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(writePlan.contacts().size()); + Map<Replica, Mutation> mutations = Maps.newHashMapWithExpectedSize(repairPlan.contacts().size()); ObjectIntHashMap<InetAddressAndPort> sourceIds = new ObjectIntHashMap<>(((repairs.length + 1) * 4) / 3); for (int i = 0 ; i < readPlan.contacts().size() ; ++i) sourceIds.put(readPlan.contacts().get(i).endpoint(), 1 + i); - for (Replica replica : writePlan.contacts()) + for (Replica replica : repairPlan.contacts()) { PartitionUpdate update = null; int i = -1 + sourceIds.get(replica.endpoint()); @@ -397,6 +399,6 @@ public class RowIteratorMergeListener<E extends Endpoints<E>> mutations.put(replica, mutation); } - readRepair.repairPartition(partitionKey, mutations, writePlan); + readRepair.repairPartition(partitionKey, mutations, repairPlan); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
