This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 9b8bd95176ffad024e4beaab7ae84abfeb1d93f6 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Wed Mar 1 12:52:29 2023 +0000 [CEP-21] Add basics of ownership and data placement Introduce new classes for representing placement of data ranges on replicas, along with the movement of data via transitions from one placement to the next. Eventually, these placements will be statically calculated in response to events with alter either the topology of the cluster (i.e. adding/removing/moving nodes) or the replication profile of the data itself (i.e. creating/altering keyspaces). These triggering events will be distributed and enacted consistently using the global log. Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../cassandra/dht/OrderPreservingPartitioner.java | 24 + src/java/org/apache/cassandra/dht/Range.java | 27 ++ src/java/org/apache/cassandra/dht/Token.java | 27 ++ .../apache/cassandra/locator/RangesByEndpoint.java | 69 +++ .../cassandra/tcm/ownership/DataPlacement.java | 211 +++++++++ .../cassandra/tcm/ownership/DataPlacements.java | 216 +++++++++ .../org/apache/cassandra/tcm/ownership/Delta.java | 125 +++++ .../tcm/ownership/GlobalPlacementDelta.java | 163 +++++++ .../cassandra/tcm/ownership/MovementMap.java | 89 ++++ .../cassandra/tcm/ownership/PlacementDeltas.java | 244 ++++++++++ .../cassandra/tcm/ownership/PlacementForRange.java | 525 +++++++++++++++++++++ .../cassandra/tcm/ownership/PlacementProvider.java | 53 +++ .../tcm/ownership/PlacementTransitionPlan.java | 128 +++++ .../cassandra/tcm/ownership/ReplicationMap.java | 106 +++++ .../cassandra/tcm/sequences/LockedRanges.java | 430 +++++++++++++++++ 15 files changed, 2437 insertions(+) diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index 2d4def95d0..fb9d0011a7 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -45,6 +45,15 @@ public class OrderPreservingPartitioner implements IPartitioner private static final String rndchars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; public static final StringToken MINIMUM = new StringToken(""); + public static final StringToken MAXIMUM = new StringToken("") { + public int compareTo(Token o) + { + if (o == MAXIMUM) + return 0; + + return 1; + } + }; public static final BigInteger CHAR_MASK = new BigInteger("65535"); @@ -116,6 +125,11 @@ public class OrderPreservingPartitioner implements IPartitioner return MINIMUM; } + public StringToken getMaximumToken() + { + return MAXIMUM; + } + public StringToken getRandomToken() { return getRandomToken(ThreadLocalRandom.current()); @@ -208,6 +222,16 @@ public class OrderPreservingPartitioner implements IPartitioner { return ByteSource.of(token, version); } + + @Override + public int compareTo(Token o) + { + // todo (rebase): I have no recollection of why this is needed - investigate + if (o == MAXIMUM) + return -1; + + return super.compareTo(o); + } } public StringToken getToken(ByteBuffer key) diff --git a/src/java/org/apache/cassandra/dht/Range.java b/src/java/org/apache/cassandra/dht/Range.java index 2c468990d2..581602cccd 100644 --- a/src/java/org/apache/cassandra/dht/Range.java +++ b/src/java/org/apache/cassandra/dht/Range.java @@ -17,6 +17,7 @@ */ package org.apache.cassandra.dht; +import java.io.IOException; import java.io.Serializable; import java.util.*; import java.util.function.Predicate; @@ -25,6 +26,11 @@ import com.google.common.collect.Iterables; import org.apache.commons.lang3.ObjectUtils; import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; import org.apache.cassandra.utils.Pair; /** @@ -38,6 +44,7 @@ import org.apache.cassandra.utils.Pair; */ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implements Comparable<Range<T>>, Serializable { + public static final Serializer serializer = new Serializer(); public static final long serialVersionUID = 1L; public Range(T left, T right) @@ -626,4 +633,24 @@ public class Range<T extends RingPosition<T>> extends AbstractBounds<T> implemen } } } + + public static class Serializer implements MetadataSerializer<Range<Token>> + { + private static final int SERDE_VERSION = MessagingService.VERSION_40; + + public void serialize(Range<Token> t, DataOutputPlus out, Version version) throws IOException + { + tokenSerializer.serialize(t, out, SERDE_VERSION); + } + + public Range<Token> deserialize(DataInputPlus in, Version version) throws IOException + { + return (Range<Token>) tokenSerializer.deserialize(in, IPartitioner.global(), SERDE_VERSION); + } + + public long serializedSize(Range<Token> t, Version version) + { + return tokenSerializer.serializedSize(t, SERDE_VERSION); + } + } } diff --git a/src/java/org/apache/cassandra/dht/Token.java b/src/java/org/apache/cassandra/dht/Token.java index 3543dabc0e..3801654e61 100644 --- a/src/java/org/apache/cassandra/dht/Token.java +++ b/src/java/org/apache/cassandra/dht/Token.java @@ -25,15 +25,20 @@ import java.nio.ByteBuffer; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.TypeSizes; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.utils.bytecomparable.ByteComparable; import org.apache.cassandra.utils.bytecomparable.ByteSource; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.net.MessagingService; public abstract class Token implements RingPosition<Token>, Serializable { private static final long serialVersionUID = 1L; public static final TokenSerializer serializer = new TokenSerializer(); + public static final MetadataSerializer metadataSerializer = new MetadataSerializer(); public static abstract class TokenFactory { @@ -90,6 +95,28 @@ public abstract class Token implements RingPosition<Token>, Serializable } } + public static class MetadataSerializer implements org.apache.cassandra.tcm.serialization.MetadataSerializer<Token> + { + private static final int SERDE_VERSION = MessagingService.VERSION_40; + + public void serialize(Token t, DataOutputPlus out, Version version) throws IOException + { + serializer.serialize(t, out, SERDE_VERSION); + } + + public Token deserialize(DataInputPlus in, Version version) throws IOException + { + // This is only ever used to deserialize Tokens from this cluster and as the partitioner can + // never be changed, it's safe to assume that the right implementation is provided by ClusterMetadata + return serializer.deserialize(in, ClusterMetadata.current().partitioner, SERDE_VERSION); + } + + public long serializedSize(Token t, Version version) + { + return serializer.serializedSize(t, SERDE_VERSION); + } + } + public static class TokenSerializer implements IPartitionerDependentSerializer<Token> { public void serialize(Token token, DataOutputPlus out, int version) throws IOException diff --git a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java index 023d7ee2b4..98bac81400 100644 --- a/src/java/org/apache/cassandra/locator/RangesByEndpoint.java +++ b/src/java/org/apache/cassandra/locator/RangesByEndpoint.java @@ -22,10 +22,25 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; +import java.io.IOException; import java.util.Map; +import java.util.Set; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, RangesAtEndpoint> { + public static RangesByEndpoint EMPTY = new RangesByEndpoint.Builder().build(); + + public static final Serializer serializer = new Serializer(); + public RangesByEndpoint(Map<InetAddressAndPort, RangesAtEndpoint> map) { super(map); @@ -53,4 +68,58 @@ public class RangesByEndpoint extends ReplicaMultimap<InetAddressAndPort, Ranges } } + public static class Serializer implements MetadataSerializer<RangesByEndpoint> + { + public void serialize(RangesByEndpoint t, DataOutputPlus out, Version version) throws IOException + { + Set<Map.Entry<InetAddressAndPort, RangesAtEndpoint>> entries = t.entrySet(); + out.writeInt(entries.size()); + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : entries) + { + InetAddressAndPort.MetadataSerializer.serializer.serialize(entry.getKey(), out, version); + AbstractReplicaCollection.ReplicaList replicas = entry.getValue().list; + out.writeInt(replicas.size()); + for (Replica r : replicas) + { + IPartitioner.validate(r.range()); + Range.serializer.serialize(r.range(), out, version); + out.writeBoolean(r.isFull()); + } + } + } + + public RangesByEndpoint deserialize(DataInputPlus in, Version version) throws IOException + { + RangesByEndpoint.Builder builder = new Builder(); + int size = in.readInt(); + for (int i=0; i<size; i++) + { + InetAddressAndPort endpoint = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + int replicas = in.readInt(); + for (int j=0; j<replicas; j++) + { + Range<Token> range = Range.serializer.deserialize(in, version); + boolean full = in.readBoolean(); + builder.put(endpoint, new Replica(endpoint, range, full)); + } + } + return builder.build(); + } + + public long serializedSize(RangesByEndpoint t, Version version) + { + long size = TypeSizes.INT_SIZE; + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : t.entrySet()) + { + size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(entry.getKey(), version); + size += TypeSizes.INT_SIZE; + for (Replica r : entry.getValue().list) + { + size += Range.serializer.serializedSize(r.range(), version); + size += TypeSizes.BOOL_SIZE; + } + } + return size; + } + } } diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java new file mode 100644 index 0000000000..7a55b67512 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacement.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +public class DataPlacement +{ + public static final Serializer serializer = new Serializer(); + + private static final DataPlacement EMPTY = new DataPlacement(PlacementForRange.EMPTY, PlacementForRange.EMPTY); + + // TODO make tree of just EndpointsForRange, navigable by EFR.range() + // TODO combine peers into a single entity with one vote in any quorum + // (e.g. old & new peer must both respond to count one replica) + public final PlacementForRange reads; + public final PlacementForRange writes; + + public DataPlacement(PlacementForRange reads, + PlacementForRange writes) + { + this.reads = reads; + this.writes = writes; + } + + /** + * A union of read and write endpoints for range, for watermark purposes + */ + public Set<InetAddressAndPort> affectedReplicas(Range<Token> range) + { + Set<InetAddressAndPort> endpoints = new HashSet<>(); + for (Replica r : reads.matchRange(range)) + endpoints.add(r.endpoint()); + for (Replica r : writes.matchRange(range)) + endpoints.add(r.endpoint()); + return endpoints; + } + + public DataPlacement combineReplicaGroups(DataPlacement other) + { + return new DataPlacement(PlacementForRange.builder() + .withReplicaGroups(reads.replicaGroups().values()) + .withReplicaGroups(other.reads.replicaGroups.values()) + .build(), + PlacementForRange.builder() + .withReplicaGroups(writes.replicaGroups().values()) + .withReplicaGroups(other.writes.replicaGroups.values()) + .build()); + } + + public PlacementDeltas.PlacementDelta difference(DataPlacement next) + { + return new PlacementDeltas.PlacementDelta(reads.difference(next.reads), + writes.difference(next.writes)); + } + + public DataPlacement mergeRangesForPlacement(Set<Token> leavingTokens) + { + return new DataPlacement(PlacementForRange.mergeRangesForPlacement(leavingTokens, reads), + PlacementForRange.mergeRangesForPlacement(leavingTokens, writes)); + } + + public DataPlacement splitRangesForPlacement(List<Token> tokens) + { + return new DataPlacement(PlacementForRange.splitRangesForPlacement(tokens, reads), + PlacementForRange.splitRangesForPlacement(tokens, writes)); + } + + public static DataPlacement empty() + { + return EMPTY; + } + + public static Builder builder() + { + return new Builder(PlacementForRange.builder(), + PlacementForRange.builder()); + } + + public Builder unbuild() + { + return new Builder(reads.unbuild(), writes.unbuild()); + } + public static class Builder + { + public final PlacementForRange.Builder reads; + public final PlacementForRange.Builder writes; + + public Builder(PlacementForRange.Builder reads, PlacementForRange.Builder writes) + { + this.reads = reads; + this.writes = writes; + } + + public Builder withWriteReplica(Replica replica) + { + this.writes.withReplica(replica); + return this; + } + + public Builder withoutWriteReplica(Replica replica) + { + this.writes.withoutReplica(replica); + return this; + } + + public Builder withReadReplica(Replica replica) + { + this.reads.withReplica(replica); + return this; + } + + public Builder withoutReadReplica(Replica replica) + { + this.reads.withoutReplica(replica); + return this; + } + + public DataPlacement build() + { + return new DataPlacement(reads.build(), writes.build()); + } + } + + @Override + public String toString() + { + return "DataPlacement{" + + "reads=" + toString(reads.replicaGroups) + + ", writes=" + toString(writes.replicaGroups) + + '}'; + } + + public static String toString(Map<?, ?> predicted) + { + StringBuilder sb = new StringBuilder(); + for (Map.Entry<?, ?> e : predicted.entrySet()) + { + sb.append(e.getKey()).append("=").append(e.getValue()).append(",\n"); + } + + return sb.toString(); + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof DataPlacement)) return false; + DataPlacement that = (DataPlacement) o; + return Objects.equals(reads, that.reads) && Objects.equals(writes, that.writes); + } + + @Override + public int hashCode() + { + return Objects.hash(reads, writes); + } + + public static class Serializer implements MetadataSerializer<DataPlacement> + { + public void serialize(DataPlacement t, DataOutputPlus out, Version version) throws IOException + { + PlacementForRange.serializer.serialize(t.reads, out, version); + PlacementForRange.serializer.serialize(t.writes, out, version); + } + + public DataPlacement deserialize(DataInputPlus in, Version version) throws IOException + { + PlacementForRange reads = PlacementForRange.serializer.deserialize(in, version); + PlacementForRange writes = PlacementForRange.serializer.deserialize(in, version); + return new DataPlacement(reads, writes); + } + + public long serializedSize(DataPlacement t, Version version) + { + return PlacementForRange.serializer.serializedSize(t.reads, version) + + PlacementForRange.serializer.serializedSize(t.writes, version); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java new file mode 100644 index 0000000000..3107a7d550 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/DataPlacements.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiConsumer; + +import com.google.common.collect.Maps; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.tcm.transformations.cms.EntireRange; +import org.apache.cassandra.utils.FBUtilities; + +import static org.apache.cassandra.db.TypeSizes.sizeof; +import static org.apache.cassandra.tcm.transformations.cms.EntireRange.entireRange; + +public class DataPlacements extends ReplicationMap<DataPlacement> implements MetadataValue<DataPlacements> +{ + public static Serializer serializer = new Serializer(); + + public static final DataPlacements EMPTY = DataPlacements.builder(1).build(); + private static DataPlacement LOCAL_PLACEMENT; + + private final Epoch lastModified; + + private DataPlacements(Epoch lastModified, Map<ReplicationParams, DataPlacement> map) + { + super(map); + this.lastModified = lastModified; + } + + protected DataPlacement defaultValue() + { + return DataPlacement.empty(); + } + + public void withDistributed(BiConsumer<ReplicationParams, DataPlacement> consumer) + { + forEach(e -> { + if (e.getKey().isLocal() || e.getKey().isMeta()) + return; + + consumer.accept(e.getKey(), e.getValue()); + }); + } + + protected DataPlacement localOnly() + { + // it's unlikely to happen, but perfectly safe to create multiple times, so no need to lock or statically init + if (null == LOCAL_PLACEMENT) + { + PlacementForRange placement = new PlacementForRange(Collections.singletonMap(entireRange, + EndpointsForRange.of(EntireRange.replica(FBUtilities.getBroadcastAddressAndPort())))); + LOCAL_PLACEMENT = new DataPlacement(placement, placement); + } + return LOCAL_PLACEMENT; + } + + public DataPlacements combineReplicaGroups(DataPlacements end) + { + DataPlacements start = this; + if (start.isEmpty()) + return end; + Builder mapBuilder = DataPlacements.builder(start.size()); + start.asMap().forEach((params, placement) -> + mapBuilder.with(params, placement.combineReplicaGroups(end.get(params)))); + return mapBuilder.build(); + } + + @Override + public DataPlacements withLastModified(Epoch epoch) + { + return new DataPlacements(epoch, asMap()); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + @Override + public String toString() + { + return "DataPlacements{" + + "lastModified=" + lastModified + + ", placementMap=" + asMap() + + '}'; + } + + public static DataPlacements sortReplicaGroups(DataPlacements placements, Comparator<Replica> comparator) + { + Builder builder = DataPlacements.builder(placements.size()); + placements.asMap().forEach((params, placement) -> { + PlacementForRange.Builder reads = PlacementForRange.builder(placement.reads.replicaGroups().size()); + placement.reads.replicaGroups().forEach((range, endpoints) -> { + reads.withReplicaGroup(endpoints.sorted(comparator)); + }); + PlacementForRange.Builder writes = PlacementForRange.builder(placement.writes.replicaGroups().size()); + placement.writes.replicaGroups().forEach((range, endpoints) -> { + writes.withReplicaGroup(endpoints.sorted(comparator)); + }); + builder.with(params, new DataPlacement(reads.build(), writes.build())); + }); + return builder.build(); + } + + public static DataPlacements empty() + { + return EMPTY; + } + + public static Builder builder(int expectedSize) + { + return new Builder(new HashMap<>(expectedSize)); + } + + public static Builder builder(Map<ReplicationParams, DataPlacement> map) + { + return new Builder(map); + } + + public Builder unbuild() + { + return new Builder(new HashMap<>(this.asMap())); + } + + public static class Builder + { + private final Map<ReplicationParams, DataPlacement> map; + private Builder(Map<ReplicationParams, DataPlacement> map) + { + this.map = map; + } + + public Builder with(ReplicationParams params, DataPlacement placement) + { + map.put(params, placement); + return this; + } + + public DataPlacements build() + { + return new DataPlacements(Epoch.EMPTY, map); + } + } + + public static class Serializer implements MetadataSerializer<DataPlacements> + { + public void serialize(DataPlacements t, DataOutputPlus out, Version version) throws IOException + { + Map<ReplicationParams, DataPlacement> map = t.asMap(); + out.writeInt(map.size()); + for (Map.Entry<ReplicationParams, DataPlacement> entry : map.entrySet()) + { + ReplicationParams.serializer.serialize(entry.getKey(), out, version); + DataPlacement.serializer.serialize(entry.getValue(), out, version); + } + Epoch.serializer.serialize(t.lastModified, out, version); + } + + public DataPlacements deserialize(DataInputPlus in, Version version) throws IOException + { + int size = in.readInt(); + Map<ReplicationParams, DataPlacement> map = Maps.newHashMapWithExpectedSize(size); + for (int i = 0; i < size; i++) + { + ReplicationParams params = ReplicationParams.serializer.deserialize(in, version); + DataPlacement dp = DataPlacement.serializer.deserialize(in, version); + map.put(params, dp); + } + Epoch lastModified = Epoch.serializer.deserialize(in, version); + return new DataPlacements(lastModified, map); + } + + public long serializedSize(DataPlacements t, Version version) + { + int size = sizeof(t.size()); + for (Map.Entry<ReplicationParams, DataPlacement> entry : t.asMap().entrySet()) + { + size += ReplicationParams.serializer.serializedSize(entry.getKey(), version); + size += DataPlacement.serializer.serializedSize(entry.getValue(), version); + } + size += Epoch.serializer.serializedSize(t.lastModified, version); + return size; + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/Delta.java b/src/java/org/apache/cassandra/tcm/ownership/Delta.java new file mode 100644 index 0000000000..a0fe3ba2d8 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/Delta.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.Objects; + +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +public class Delta +{ + public static final Serializer serializer = new Serializer(); + + private static final Delta EMPTY = new Delta(RangesByEndpoint.EMPTY, RangesByEndpoint.EMPTY); + + public final RangesByEndpoint removals; + public final RangesByEndpoint additions; + + public Delta(RangesByEndpoint removals, RangesByEndpoint additions) + { + this.removals = removals; + this.additions = additions; + } + + public Delta onlyAdditions() + { + return new Delta(RangesByEndpoint.EMPTY, additions); + } + + public Delta onlyRemovals() + { + return new Delta(removals, RangesByEndpoint.EMPTY); + } + + public Delta merge(Delta other) + { + return new Delta(merge(removals, other.removals), + merge(additions, other.additions)); + } + + public Delta invert() + { + return new Delta(additions, removals); + } + + public static RangesByEndpoint merge(RangesByEndpoint...byEndpoints) + { + RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder(); + for (RangesByEndpoint rbe : byEndpoints) + { + rbe.asMap() + .forEach((endpoint, replicas) -> replicas.forEach(replica -> builder.put(endpoint, replica))); + } + return builder.build(); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Delta delta = (Delta) o; + + return Objects.equals(removals, delta.removals) && Objects.equals(additions, delta.additions); + } + + public int hashCode() + { + return Objects.hash(removals, additions); + } + + @Override + public String toString() + { + return "Delta{" + + "removals=" + removals + + ", additions=" + additions + + '}'; + } + + public static Delta empty() + { + return EMPTY; + } + + public static final class Serializer implements MetadataSerializer<Delta> + { + public void serialize(Delta t, DataOutputPlus out, Version version) throws IOException + { + RangesByEndpoint.serializer.serialize(t.removals, out, version); + RangesByEndpoint.serializer.serialize(t.additions, out, version); + } + + public Delta deserialize(DataInputPlus in, Version version) throws IOException + { + return new Delta(RangesByEndpoint.serializer.deserialize(in, version), + RangesByEndpoint.serializer.deserialize(in, version)); + } + + public long serializedSize(Delta t, Version version) + { + return RangesByEndpoint.serializer.serializedSize(t.removals, version) + + RangesByEndpoint.serializer.serializedSize(t.additions, version); + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java b/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java new file mode 100644 index 0000000000..a2ec8a2074 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/GlobalPlacementDelta.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +//import java.io.IOException; +//import java.util.*; +// +//import org.apache.cassandra.db.TypeSizes; +//import org.apache.cassandra.io.IVersionedSerializer; +//import org.apache.cassandra.io.util.DataInputPlus; +//import org.apache.cassandra.io.util.DataOutputPlus; +//import org.apache.cassandra.schema.ReplicationParams; +// +//public class GlobalPlacementDelta +//{ +// public static final Serializer serializer = new Serializer(); +// public final PlacementDeltas readDeltas; +// public final PlacementDeltas writeDeltas; +// +// private GlobalPlacementDelta(PlacementDeltas readDeltas, PlacementDeltas writeDeltas) +// { +// this.readDeltas = readDeltas; +// this.writeDeltas = writeDeltas; +// } +// +// public DataPlacements apply(DataPlacements placements) +// { +// return apply(placements, readDeltas) +// return DataPlacements.builder() +// .reads(apply(placements.reads, readDeltas)) +// .writes(apply(placements.writes, writeDeltas)) +// .build(); +// } +// +// public static DataPlacements apply(DataPlacements before, PlacementDeltas delta) +// { +// DataPlacements.Builder mapBuilder = DataPlacements.builder(before.size()); +// before.asMap().forEach((params, placement) -> { +// PlacementDeltas.PlacementDelta d = delta.get(params); +// if (d == null) +// mapBuilder.put(params, placement); +// else +// mapBuilder.put(params, d.apply(placement)); +// }); +// return mapBuilder.build(); +// } +// +// public boolean equals(Object o) +// { +// if (this == o) return true; +// if (o == null || getClass() != o.getClass()) return false; +// GlobalPlacementDelta that = (GlobalPlacementDelta) o; +// return Objects.equals(readDeltas, that.readDeltas) && Objects.equals(writeDeltas, that.writeDeltas); +// } +// +// public int hashCode() +// { +// return Objects.hash(readDeltas, writeDeltas); +// } +// +// public String toString() +// { +// return "PlacementDelta{" + +// "reads=" + readDeltas + +// ", writes=" + writeDeltas + +// '}'; +// } +// +// public static Builder builder() +// { +// return new Builder(); +// } +// +// public static final class Builder +// { +// PlacementDeltas.Builder reads = PlacementDeltas.builder(); +// PlacementDeltas.Builder writes = PlacementDeltas.builder(); +// +// public Builder withReadDelta(ReplicationParams params, Delta delta) +// { +// reads.put(params, delta); +// return this; +// } +// +// public Builder withWriteDelta(ReplicationParams params, Delta delta) +// { +// writes.put(params, delta); +// return this; +// } +// +// public GlobalPlacementDelta build() +// { +// return new GlobalPlacementDelta(reads.build(), writes.build()); +// } +// } +// +// public static class Serializer implements IVersionedSerializer<GlobalPlacementDelta> +// { +// +// public void serialize(GlobalPlacementDelta t, DataOutputPlus out, int version) throws IOException +// { +// out.writeInt(t.readDeltas.size()); +// for (Map.Entry<ReplicationParams, Delta> e : t.readDeltas) +// { +// ReplicationParams.serializer.serialize(e.getKey(), out, version); +// Delta.serializer.serialize(e.getValue(), out, version); +// } +// out.writeInt(t.writeDeltas.size()); +// for (Map.Entry<ReplicationParams, Delta> e : t.writeDeltas) +// { +// ReplicationParams.serializer.serialize(e.getKey(), out, version); +// Delta.serializer.serialize(e.getValue(), out, version); +// } +// } +// +// public GlobalPlacementDelta deserialize(DataInputPlus in, int version) throws IOException +// { +// int size = in.readInt(); +// PlacementDeltas.Builder reads = PlacementDeltas.builder(size); +// for (int i = 0; i < size; i++) +// reads.put(ReplicationParams.serializer.deserialize(in, version), Delta.serializer.deserialize(in, version)); +// size = in.readInt(); +// PlacementDeltas.Builder writes = PlacementDeltas.builder(size); +// for (int i = 0; i < size; i++) +// writes.put(ReplicationParams.serializer.deserialize(in, version), Delta.serializer.deserialize(in, version)); +// +// return new GlobalPlacementDelta(reads.build(), writes.build()); +// } +// +// public long serializedSize(GlobalPlacementDelta t, int version) +// { +// long size = TypeSizes.INT_SIZE; +// for (Map.Entry<ReplicationParams, Delta> e : t.readDeltas) +// { +// size += ReplicationParams.serializer.serializedSize(e.getKey(), version); +// size += Delta.serializer.serializedSize(e.getValue(), version); +// } +// size += TypeSizes.INT_SIZE; +// for (Map.Entry<ReplicationParams, Delta> e : t.writeDeltas) +// { +// size += ReplicationParams.serializer.serializedSize(e.getKey(), version); +// size += Delta.serializer.serializedSize(e.getValue(), version); +// } +// return size; +// } +// } +//} diff --git a/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java b/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java new file mode 100644 index 0000000000..0d2e15b2c5 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/MovementMap.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import org.apache.cassandra.locator.EndpointsByReplica; +import org.apache.cassandra.schema.ReplicationParams; + +public class MovementMap extends ReplicationMap<EndpointsByReplica> +{ + private static final MovementMap EMPTY = new MovementMap(Collections.emptyMap()); + + private MovementMap(Map<ReplicationParams, EndpointsByReplica> map) + { + super(map); + } + + protected EndpointsByReplica defaultValue() + { + return new EndpointsByReplica.Builder().build(); + } + + protected EndpointsByReplica localOnly() + { + throw new IllegalStateException("Cannot move local ranges"); + } + + public String toString() + { + return "MovementMap{" + + "map=" + asMap() + + '}'; + } + + public static MovementMap empty() + { + return EMPTY; + } + + public static Builder builder() + { + return new Builder(new HashMap<>()); + } + + public static Builder builder(int expectedSize) + { + return new Builder(new HashMap<>(expectedSize)); + } + + public static class Builder + { + private final Map<ReplicationParams, EndpointsByReplica> map; + private Builder(Map<ReplicationParams, EndpointsByReplica> map) + { + this.map = map; + } + + public Builder put(ReplicationParams params, EndpointsByReplica placement) + { + map.put(params, placement); + return this; + } + + public MovementMap build() + { + return new MovementMap(map); + } + } + +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java new file mode 100644 index 0000000000..b54806834d --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementDeltas.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import org.apache.cassandra.db.TypeSizes; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +public class PlacementDeltas extends ReplicationMap<PlacementDeltas.PlacementDelta> +{ + public static final Serializer serializer = new Serializer(); + private static final PlacementDeltas EMPTY = new PlacementDeltas(Collections.emptyMap()); + + private PlacementDeltas(Map<ReplicationParams, PlacementDelta> map) + { + super(map); + } + + protected PlacementDelta defaultValue() + { + return PlacementDelta.EMPTY; + } + + protected PlacementDelta localOnly() + { + throw new IllegalStateException("Cannot apply diff to local placements."); + } + + public PlacementDeltas invert() + { + Builder inverse = builder(size()); + asMap().forEach((params, delta) -> inverse.put(params, delta.invert())); + return inverse.build(); + } + + @Override + public String toString() + { + return "DeltaMap{" + + "map=" + asMap() + + '}'; + } + + public DataPlacements apply(DataPlacements placements) + { + DataPlacements.Builder builder = placements.unbuild(); + asMap().forEach((params, delta) -> { + DataPlacement previous = placements.get(params); + builder.with(params, delta.apply(previous)); + }); + return builder.build(); + } + + public static PlacementDeltas empty() + { + return EMPTY; + } + + public static Builder builder() + { + return new Builder(new HashMap<>()); + } + + public static Builder builder(int expectedSize) + { + return new Builder(new HashMap<>(expectedSize)); + } + + public static Builder builder(Map<ReplicationParams, PlacementDelta> map) + { + return new Builder(map); + } + + public static class PlacementDelta + { + public static PlacementDelta EMPTY = new PlacementDelta(Delta.empty(), Delta.empty()); + + public final Delta reads; + public final Delta writes; + + public PlacementDelta(Delta reads, Delta writes) + { + this.reads = reads; + this.writes = writes; + } + + public PlacementDelta onlyReads() + { + return new PlacementDelta(reads, Delta.empty()); + } + + public PlacementDelta onlyWrites() + { + return new PlacementDelta(Delta.empty(), writes); + } + + public PlacementDelta onlyAdditions() + { + return new PlacementDelta(reads.onlyAdditions(), writes.onlyAdditions()); + } + + public PlacementDelta onlyRemovals() + { + return new PlacementDelta(reads.onlyRemovals(), writes.onlyRemovals()); + } + + public DataPlacement apply(DataPlacement placement) + { + DataPlacement.Builder builder = placement.unbuild(); + reads.additions.flattenValues().forEach(builder.reads::withReplica); + writes.additions.flattenValues().forEach(builder.writes::withReplica); + + reads.removals.flattenValues().forEach(builder.reads::withoutReplica); + writes.removals.flattenValues().forEach(builder.writes::withoutReplica); + return builder.build(); + } + + public PlacementDelta merge(PlacementDelta other) + { + return new PlacementDelta(reads.merge(other.reads), + writes.merge(other.writes)); + } + + public PlacementDelta invert() + { + return new PlacementDelta(reads.invert(), writes.invert()); + } + + public String toString() + { + return "PlacementDelta{" + + "reads=" + reads + + ", writes=" + writes + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + PlacementDelta other = (PlacementDelta) o; + + return Objects.equals(reads, other.reads) && Objects.equals(writes, other.writes); + } + + @Override + public int hashCode() + { + return Objects.hash(reads, writes); + } + } + + public static class Builder + { + private final Map<ReplicationParams, PlacementDelta> map; + private Builder(Map<ReplicationParams, PlacementDelta> map) + { + this.map = map; + } + + public Builder put(ReplicationParams params, PlacementDelta placement) + { + PlacementDelta delta = map.get(params); + if (delta == null) + map.put(params, placement); + else + map.put(params, delta.merge(placement)); + return this; + } + + public PlacementDeltas build() + { + return new PlacementDeltas(map); + } + } + + public static class Serializer implements MetadataSerializer<PlacementDeltas> + { + public void serialize(PlacementDeltas t, DataOutputPlus out, Version version) throws IOException + { + out.writeInt(t.size()); + for (Map.Entry<ReplicationParams, PlacementDelta> e : t.asMap().entrySet()) + { + ReplicationParams.serializer.serialize(e.getKey(), out, version); + Delta.serializer.serialize(e.getValue().reads, out, version); + Delta.serializer.serialize(e.getValue().writes, out, version); + } + + } + + public PlacementDeltas deserialize(DataInputPlus in, Version version) throws IOException + { + int size = in.readInt(); + Builder builder = PlacementDeltas.builder(size); + for (int i = 0; i < size; i++) + { + ReplicationParams replicationParams = ReplicationParams.serializer.deserialize(in, version); + Delta reads = Delta.serializer.deserialize(in, version); + Delta writes = Delta.serializer.deserialize(in, version); + builder.put(replicationParams, new PlacementDelta(reads, writes)); + } + return builder.build(); + } + + public long serializedSize(PlacementDeltas t, Version version) + { + long size = TypeSizes.INT_SIZE; + for (Map.Entry<ReplicationParams, PlacementDelta> e : t.asMap().entrySet()) + { + size += ReplicationParams.serializer.serializedSize(e.getKey(), version); + size += Delta.serializer.serializedSize(e.getValue().reads, version); + size += Delta.serializer.serializedSize(e.getValue().writes, version); + } + return size; + } + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java new file mode 100644 index 0000000000..aa5f90020c --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementForRange.java @@ -0,0 +1,525 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.stream.Collectors; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.locator.AbstractReplicaCollection; +import org.apache.cassandra.locator.EndpointsForRange; +import org.apache.cassandra.locator.EndpointsForToken; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.locator.RangesAtEndpoint; +import org.apache.cassandra.locator.RangesByEndpoint; +import org.apache.cassandra.locator.Replica; +import org.apache.cassandra.locator.ReplicaCollection; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.apache.cassandra.db.TypeSizes.sizeof; + +public class PlacementForRange +{ + public static final Serializer serializer = new Serializer(); + + public static final PlacementForRange EMPTY = PlacementForRange.builder().build(); + + final SortedMap<Range<Token>, EndpointsForRange> replicaGroups; + + public PlacementForRange(Map<Range<Token>, EndpointsForRange> replicaGroups) + { + this.replicaGroups = new TreeMap<>(replicaGroups); + } + + @VisibleForTesting + public Map<Range<Token>, EndpointsForRange> replicaGroups() + { + return Collections.unmodifiableMap(replicaGroups); + } + + @VisibleForTesting + public List<Range<Token>> ranges() + { + List<Range<Token>> ranges = new ArrayList<>(replicaGroups.keySet()); + ranges.sort(Range::compareTo); + return ranges; + } + + @VisibleForTesting + public EndpointsForRange forRange(Range<Token> range) // TODO: rename to `forRangeRead`? e + { + // can't use range.isWrapAround() since range.unwrap() returns a wrapping range (right token is min value) + assert range.right.compareTo(range.left) > 0 || range.right.equals(range.right.minValue()); + return replicaGroups.get(range); + } + + /** + * This method is intended to be used on read/write path, not forRange. + */ + public EndpointsForRange matchRange(Range<Token> range) + { + EndpointsForRange.Builder builder = new EndpointsForRange.Builder(range); + for (Map.Entry<Range<Token>, EndpointsForRange> entry : replicaGroups.entrySet()) + { + if (entry.getKey().contains(range)) + builder.addAll(entry.getValue(), ReplicaCollection.Builder.Conflict.ALL); + } + return builder.build(); + } + + public EndpointsForRange forRange(Token token) + { + for (Map.Entry<Range<Token>, EndpointsForRange> entry : replicaGroups.entrySet()) + { + if (entry.getKey().contains(token)) + return entry.getValue(); + } + throw new IllegalStateException("Could not find range for token " + token + " in PlacementForRange: " + replicaGroups); + } + + // TODO: this could be improved by searching it rather than iterating over it + public EndpointsForToken forToken(Token token) + { + EndpointsForToken.Builder builder = new EndpointsForToken.Builder(token); + for (Map.Entry<Range<Token>, EndpointsForRange> entry : replicaGroups.entrySet()) + { + if (entry.getKey().contains(token)) + builder.addAll(entry.getValue().forToken(token), ReplicaCollection.Builder.Conflict.ALL); + } + return builder.build(); + } + + public Delta difference(PlacementForRange next) + { + RangesByEndpoint oldMap = this.byEndpoint(); + RangesByEndpoint newMap = next.byEndpoint(); + return new Delta(diff(oldMap, newMap), diff(newMap, oldMap)); + } + + @VisibleForTesting + public RangesByEndpoint byEndpoint() + { + RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder(); + for (Map.Entry<Range<Token>, EndpointsForRange> oldPlacement : this.replicaGroups.entrySet()) + oldPlacement.getValue().byEndpoint().forEach(builder::put); + return builder.build(); + } + + // TODO do this without using a builder to collate and without using subtractSameReplication + // i.e. by directly removing the replicas so that we don't need to care about the ordering of with/without. + public PlacementForRange without(RangesByEndpoint toRemove) + { + Builder builder = builder(); + RangesByEndpoint currentByEndpoint = this.byEndpoint(); + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> endPointRanges : currentByEndpoint.entrySet()) + { + InetAddressAndPort endpoint = endPointRanges.getKey(); + RangesAtEndpoint currentRanges = endPointRanges.getValue(); + RangesAtEndpoint removeRanges = toRemove.get(endpoint); + for (Replica oldReplica : currentRanges) + { + RangesAtEndpoint toRetain = oldReplica.subtractSameReplication(removeRanges); + toRetain.forEach(builder::withReplica); + } + } + return builder.build(); + } + + // TODO do this without using a builder, i.e. by directly adding the replicas + // (and directly removing them in without) so that we don't need to care + // about the ordering of with/without. + public PlacementForRange with(RangesByEndpoint toAdd) + { + Builder builder = builder(); + for (EndpointsForRange current : replicaGroups.values()) + builder.withReplicaGroup(current); + for (Replica newReplica : toAdd.flattenValues()) + builder.withReplica(newReplica); + return builder.build(); + } + + private RangesByEndpoint diff(RangesByEndpoint left, RangesByEndpoint right) + { + RangesByEndpoint.Builder builder = new RangesByEndpoint.Builder(); + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> endPointRanges : left.entrySet()) + { + InetAddressAndPort endpoint = endPointRanges.getKey(); + RangesAtEndpoint leftRanges = endPointRanges.getValue(); + RangesAtEndpoint rightRanges = right.get(endpoint); + for (Replica leftReplica : leftRanges) + { + if (!rightRanges.contains(leftReplica)) + builder.put(endpoint, leftReplica); + } + } + return builder.build(); + } + + @Override + public String toString() + { + return replicaGroups.toString(); + } + + @VisibleForTesting + public Map<String, List<String>> toStringByEndpoint() + { + Map<String, List<String>> mappings = new HashMap<>(); + for (Map.Entry<InetAddressAndPort, RangesAtEndpoint> entry : byEndpoint().entrySet()) + mappings.put(entry.getKey().toString(), entry.getValue().asList(r -> r.range().toString())); + return mappings; + } + + @VisibleForTesting + public List<String> toReplicaStringList() + { + return replicaGroups.values() + .stream() + .flatMap(AbstractReplicaCollection::stream) + .map(Replica::toString) + .collect(Collectors.toList()); + } + + public Builder unbuild() + { + return new Builder(new HashMap<>(replicaGroups)); + } + + public static Builder builder() + { + return new Builder(); + } + + public static Builder builder(int expectedSize) + { + return new Builder(expectedSize); + } + + @VisibleForTesting + public static PlacementForRange mergeRangesForPlacement(Set<Token> leavingTokens, PlacementForRange placement) + { + if (placement.replicaGroups.isEmpty()) + return placement; + + Builder newPlacement = PlacementForRange.builder(); + List<Map.Entry<Range<Token>, EndpointsForRange>> sortedEntries = new ArrayList<>(placement.replicaGroups.entrySet()); + sortedEntries.sort(Comparator.comparing(e -> e.getKey().left)); + Iterator<Map.Entry<Range<Token>, EndpointsForRange>> iter = sortedEntries.iterator(); + EndpointsForRange stashedReplicas = null; + + // TODO verify with transient replication. For example, if we have ranges [0,100] -> A, B and + // [100,200] -> Atransient, B, we would currently not be able to merge those as A != Atransient + while (iter.hasNext()) + { + Map.Entry<Range<Token>, EndpointsForRange> entry = iter.next(); + Range<Token> currentRange = entry.getKey(); + EndpointsForRange currentReplicas = entry.getValue(); + // current range ends with one of the tokens being removed so we will want to merge it with the neighbouring + // range, potentially with a number of subsequent ranges if there is an unbroken sequence of leaving tokens. + if (leavingTokens.contains(currentRange.right)) + { + // the previous range (if there was one) did not end in a leaving token, so the stash is empty + if (stashedReplicas == null) + stashedReplicas = currentReplicas; + // we already have a stashed replica group. Its range must be contiguous with this one so extend the + // range, joining them. This asserts that the two sets of replicas match in terms of endpoints and + // full/transient status too. + else + stashedReplicas = mergeReplicaGroups(stashedReplicas, currentReplicas); + } + // current range does not end in a leaving token + else + { + // nothing stashed to merge, so add current replica group as it is + if (stashedReplicas == null) + newPlacement.withReplicaGroup(currentReplicas); + else + { + // we have stashed the preceding replica set, so merge it with this one, add it to the new + // placement and clear the stash + newPlacement.withReplicaGroup(mergeReplicaGroups(stashedReplicas, currentReplicas)); + stashedReplicas = null; + } + } + } + if (null != stashedReplicas) + newPlacement.withReplicaGroup(stashedReplicas); + + return newPlacement.build(); + } + + private static EndpointsForRange mergeReplicaGroups(EndpointsForRange left, EndpointsForRange right) + { + assert sameReplicas(left, right); + Range<Token> mergedRange = new Range<>(left.range().left, right.range().right); + return EndpointsForRange.builder(mergedRange, left.size()) + .addAll(left.asList(r -> new Replica(r.endpoint(), mergedRange, r.isFull()))) + .build(); + } + + private static boolean sameReplicas(EndpointsForRange left, EndpointsForRange right) + { + if (left.size() != right.size()) + return false; + + Comparator<Replica> comparator = Comparator.comparing(Replica::endpoint); + EndpointsForRange l = left.sorted(comparator); + EndpointsForRange r = right.sorted(comparator); + for (int i = 0; i < l.size(); i++) + { + Replica r1 = l.get(i); + Replica r2 = r.get(i); + if (!(r1.endpoint().equals(r2.endpoint())) || r1.isFull() != r2.isFull()) + return false; + } + return true; + } + + @VisibleForTesting + public static PlacementForRange splitRangesForPlacement(List<Token> tokens, PlacementForRange placement) + { + if (placement.replicaGroups.isEmpty()) + return placement; + + Builder newPlacement = PlacementForRange.builder(); + List<EndpointsForRange> eprs = new ArrayList<>(placement.replicaGroups.values()); + eprs.sort(Comparator.comparing(a -> a.range().left)); + Token min = eprs.get(0).range().left; + Token max = eprs.get(eprs.size() - 1).range().right; + + // if any token is < the start or > the end of the ranges covered, error + if (tokens.get(0).compareTo(min) < 0 || (!max.equals(min) && tokens.get(tokens.size()-1).compareTo(max) > 0)) + throw new IllegalArgumentException("New tokens exceed total bounds of current placement ranges " + tokens + " " + eprs); + Iterator<EndpointsForRange> iter = eprs.iterator(); + EndpointsForRange current = iter.next(); + for (Token token : tokens) + { + // handle special case where one of the tokens is the min value + if (token.equals(min)) + continue; + + assert current != null : tokens + " " + eprs; + Range<Token> r = current.range(); + int cmp = token.compareTo(r.right); + if (cmp == 0) + { + newPlacement.withReplicaGroup(current); + if (iter.hasNext()) + current = iter.next(); + else + current = null; + } + else if (cmp < 0 || r.right.isMinimum()) + { + Range<Token> left = new Range<>(r.left, token); + Range<Token> right = new Range<>(token, r.right); + newPlacement.withReplicaGroup(EndpointsForRange.builder(left) + .addAll(current.asList(rep->rep.decorateSubrange(left))) + .build()); + current = EndpointsForRange.builder(right) + .addAll(current.asList(rep->rep.decorateSubrange(right))) + .build(); + } + } + + if (current != null) + newPlacement.withReplicaGroup(current); + + return newPlacement.build(); + } + + public static class Builder + { + private final Map<Range<Token>, EndpointsForRange> replicaGroups; + + private Builder() + { + this(new HashMap<>()); + } + + private Builder(int expectedSize) + { + this(new HashMap<>(expectedSize)); + } + + private Builder(Map<Range<Token>, EndpointsForRange> replicaGroups) + { + this.replicaGroups = replicaGroups; + } + + public Builder withReplica(Replica replica) + { + EndpointsForRange group = + replicaGroups.computeIfPresent(replica.range(), + (t, old) -> old.newBuilder(old.size() + 1) + .addAll(old) + .add(replica, ReplicaCollection.Builder.Conflict.ALL) + .build()); + if (group == null) + replicaGroups.put(replica.range(), EndpointsForRange.of(replica)); + return this; + + } + + public Builder withoutReplica(Replica replica) + { + Range<Token> range = replica.range(); + EndpointsForRange group = replicaGroups.get(range); + if (group == null) + throw new IllegalArgumentException(String.format("No group found for range of supplied replica %s (%s)", + replica, range)); + EndpointsForRange without = group.without(Collections.singleton(replica.endpoint())); + if (without.isEmpty()) + replicaGroups.remove(range); + else + replicaGroups.put(range, without); + return this; + } + + public Builder withReplicaGroup(EndpointsForRange replicas) + { + EndpointsForRange group = + replicaGroups.computeIfPresent(replicas.range(), + (t, old) -> replicas.newBuilder(replicas.size() + old.size()) + .addAll(old) + .addAll(replicas, ReplicaCollection.Builder.Conflict.ALL) + .build()); + if (group == null) + replicaGroups.put(replicas.range(), replicas); + return this; + } + + public Builder withReplicaGroups(Iterable<EndpointsForRange> replicas) + { + replicas.forEach(this::withReplicaGroup); + return this; + } + + public PlacementForRange build() + { + return new PlacementForRange(this.replicaGroups); + } + } + + public static class Serializer implements MetadataSerializer<PlacementForRange> + { + public void serialize(PlacementForRange t, DataOutputPlus out, Version version) throws IOException + { + out.writeInt(t.replicaGroups.size()); + + for (Map.Entry<Range<Token>, EndpointsForRange> entry : t.replicaGroups.entrySet()) + { + Range<Token> range = entry.getKey(); + EndpointsForRange efr = entry.getValue(); + Token.metadataSerializer.serialize(range.left, out, version); + Token.metadataSerializer.serialize(range.right, out, version); + out.writeInt(efr.size()); + for (int i = 0; i < efr.size(); i++) + { + Replica r = efr.get(i); + Token.metadataSerializer.serialize(r.range().left, out, version); + Token.metadataSerializer.serialize(r.range().right, out, version); + InetAddressAndPort.MetadataSerializer.serializer.serialize(r.endpoint(), out, version); + out.writeBoolean(r.isFull()); + } + } + } + + public PlacementForRange deserialize(DataInputPlus in, Version version) throws IOException + { + int groupCount = in.readInt(); + Map<Range<Token>, EndpointsForRange> result = Maps.newHashMapWithExpectedSize(groupCount); + for (int i = 0; i < groupCount; i++) + { + Range<Token> range = new Range<>(Token.metadataSerializer.deserialize(in, version), + Token.metadataSerializer.deserialize(in, version)); + int replicaCount = in.readInt(); + List<Replica> replicas = new ArrayList<>(replicaCount); + for (int x = 0; x < replicaCount; x++) + { + Range<Token> replicaRange = new Range<>(Token.metadataSerializer.deserialize(in, version), + Token.metadataSerializer.deserialize(in, version)); + InetAddressAndPort replicaAddress = InetAddressAndPort.MetadataSerializer.serializer.deserialize(in, version); + boolean isFull = in.readBoolean(); + replicas.add(new Replica(replicaAddress, replicaRange, isFull)); + + } + EndpointsForRange efr = EndpointsForRange.copyOf(replicas); + result.put(range, efr); + } + return new PlacementForRange(result); + } + + public long serializedSize(PlacementForRange t, Version version) + { + int size = sizeof(t.replicaGroups.size()); + for (Map.Entry<Range<Token>, EndpointsForRange> entry : t.replicaGroups.entrySet()) + { + Range<Token> range = entry.getKey(); + EndpointsForRange efr = entry.getValue(); + + size += Token.metadataSerializer.serializedSize(range.left, version); + size += Token.metadataSerializer.serializedSize(range.right, version); + size += sizeof(efr.size()); + for (int i = 0; i < efr.size(); i++) + { + Replica r = efr.get(i); + size += Token.metadataSerializer.serializedSize(r.range().left, version); + size += Token.metadataSerializer.serializedSize(r.range().right, version); + size += InetAddressAndPort.MetadataSerializer.serializer.serializedSize(r.endpoint(), version); + size += sizeof(r.isFull()); + } + } + return size; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof PlacementForRange)) return false; + PlacementForRange that = (PlacementForRange) o; + return Objects.equals(replicaGroups, that.replicaGroups); + } + + @Override + public int hashCode() + { + return Objects.hash(replicaGroups); + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java new file mode 100644 index 0000000000..8ab93c47c0 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementProvider.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.List; +import java.util.Set; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.schema.Keyspaces; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; + +public interface PlacementProvider +{ + DataPlacements calculatePlacements(List<Range<Token>> ranges, ClusterMetadata metadata, Keyspaces keyspaces); + // TODO naming + PlacementTransitionPlan planForJoin(ClusterMetadata metadata, + NodeId joining, + Set<Token> tokens, + Keyspaces keyspaces); + + PlacementTransitionPlan planForMove(ClusterMetadata metadata, + NodeId nodeId, + Set<Token> tokens, + Keyspaces keyspaces); + + // TODO: maybe leave, for consistency? + PlacementTransitionPlan planForDecommission(ClusterMetadata metadata, + NodeId nodeId, + Keyspaces keyspaces); + + PlacementTransitionPlan planForReplacement(ClusterMetadata metadata, + NodeId replaced, + NodeId replacement, + Keyspaces keyspaces); +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java new file mode 100644 index 0000000000..edb22919dc --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/PlacementTransitionPlan.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import org.apache.cassandra.tcm.sequences.LockedRanges; + +/** + * A transition plan contains four elements: + * - deltas to split original ranges, if necessary + * - deltas to move each placement from start to maximal (i.e. overreplicated) state + * - deltas to move each placement from maximal to final state + * - deltas to merge final ranges, if necessary + * These deltas describe the abstract changes necessary to transition between two one cluster states. + * When the plan is compiled, these deltas are deconstructed and recomposed into the steps necessary for + * safe execution of the given operation. + */ +public class PlacementTransitionPlan +{ + public final PlacementDeltas toSplit; + public final PlacementDeltas toMaximal; + public final PlacementDeltas toFinal; + public final PlacementDeltas toMerged; + + private PlacementDeltas addToWrites; + private PlacementDeltas moveReads; + private PlacementDeltas removeFromWrites; + private LockedRanges.AffectedRanges affectedRanges; + + public PlacementTransitionPlan(PlacementDeltas toSplit, + PlacementDeltas toMaximal, + PlacementDeltas toFinal, + PlacementDeltas toMerged) + { + this.toSplit = toSplit; + this.toMaximal = toMaximal; + this.toFinal = toFinal; + this.toMerged = toMerged; + } + + public PlacementDeltas addToWrites() + { + if (addToWrites == null) + compile(); + return addToWrites; + } + public PlacementDeltas moveReads() + { + if (moveReads == null) + compile(); + return moveReads; + } + + public PlacementDeltas removeFromWrites() + { + if (removeFromWrites == null) + compile(); + return removeFromWrites; + } + + public LockedRanges.AffectedRanges affectedRanges() + { + if (affectedRanges == null) + compile(); + return affectedRanges; + } + + private void compile() + { + PlacementDeltas.Builder addToWrites = PlacementDeltas.builder(); + PlacementDeltas.Builder moveReads = PlacementDeltas.builder(); + PlacementDeltas.Builder removeFromWrites = PlacementDeltas.builder(); + LockedRanges.AffectedRangesBuilder affectedRanges = LockedRanges.AffectedRanges.builder(); + + toSplit.forEach((replication, delta) -> { + delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + }); + + toMaximal.forEach((replication, delta) -> { + delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + addToWrites.put(replication, delta.onlyWrites()); + moveReads.put(replication, delta.onlyReads()); + }); + + toFinal.forEach((replication, delta) -> { + delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + moveReads.put(replication, delta.onlyReads()); + removeFromWrites.put(replication, delta.onlyWrites()); + }); + + toMerged.forEach((replication, delta) -> { + delta.reads.additions.flattenValues().forEach(r -> affectedRanges.add(replication, r.range())); + removeFromWrites.put(replication, delta); + }); + this.addToWrites = addToWrites.build(); + this.moveReads = moveReads.build(); + this.removeFromWrites = removeFromWrites.build(); + this.affectedRanges = affectedRanges.build(); + + } + + @Override + public String toString() + { + return "PlacementTransitionPlan{" + + "toSplit=" + toSplit + + ", toMaximal=" + toMaximal + + ", toFinal=" + toFinal + + ", toMerged=" + toMerged + + ", compiled=" + (addToWrites == null) + + '}'; + } +} diff --git a/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java b/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java new file mode 100644 index 0000000000..80e519f850 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/ownership/ReplicationMap.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.ownership; + +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import com.google.common.collect.ImmutableMap; + +import org.apache.cassandra.schema.ReplicationParams; + +public abstract class ReplicationMap<T> implements Iterable<Map.Entry<ReplicationParams, T>> +{ + private final Map<ReplicationParams, T> map; + + protected ReplicationMap() + { + this(Collections.emptyMap()); + } + + protected ReplicationMap(Map<ReplicationParams, T> map) + { + this.map = map; + } + + protected abstract T defaultValue(); + protected abstract T localOnly(); + + public T get(ReplicationParams params) + { + if (params.isLocal()) + return localOnly(); + return map.getOrDefault(params, defaultValue()); + } + + public int size() + { + return map.size(); + } + + public boolean isEmpty() + { + return map.isEmpty(); + } + + public void forEach(BiConsumer<ReplicationParams, T> consumer) + { + for (Map.Entry<ReplicationParams, T> entry : this) + consumer.accept(entry.getKey(), entry.getValue()); + } + + public ImmutableMap<ReplicationParams, T> asMap() + { + return ImmutableMap.copyOf(map); + } + + public Set<ReplicationParams> keys() + { + return map.keySet(); + } + + public Iterator<Map.Entry<ReplicationParams, T>> iterator() + { + return map.entrySet().iterator(); + } + + public Stream<Map.Entry<ReplicationParams, T>> stream() + { + return StreamSupport.stream(spliterator(), false); + } + + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReplicationMap<?> that = (ReplicationMap<?>) o; + return map.equals(that.map); + } + + public int hashCode() + { + return Objects.hash(map); + } +} diff --git a/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java new file mode 100644 index 0000000000..a22f8d43f1 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/sequences/LockedRanges.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.sequences; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; + +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.ReplicationParams; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.membership.Directory; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.DataPlacement; +import org.apache.cassandra.tcm.ownership.DataPlacements; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; + +import static org.apache.cassandra.db.TypeSizes.sizeof; + +public class LockedRanges implements MetadataValue<LockedRanges> +{ + public static final Serializer serializer = new Serializer(); + public static final LockedRanges EMPTY = new LockedRanges(Epoch.EMPTY, ImmutableMap.<Key, AffectedRanges>builder().build()); + public static final Key NOT_LOCKED = new Key(Epoch.EMPTY); + public final ImmutableMap<Key, AffectedRanges> locked; + private final Epoch lastModified; + + private LockedRanges(Epoch lastModified, ImmutableMap<Key, AffectedRanges> locked) + { + this.lastModified = lastModified; + this.locked = locked; + } + + public LockedRanges lock(Key key, AffectedRanges ranges) + { + assert !key.equals(NOT_LOCKED) : "Can't lock ranges with noop key"; + + if (ranges == AffectedRanges.EMPTY) + return this; + + // TODO might we need the ability for the holder of a key to lock multiple sets over time? + return new LockedRanges(lastModified, + ImmutableMap.<Key, AffectedRanges>builderWithExpectedSize(locked.size()) + .putAll(locked) + .put(key, ranges) + .build()); + } + + public LockedRanges unlock(Key key) + { + if (key.equals(NOT_LOCKED)) + return this; + ImmutableMap.Builder<Key, AffectedRanges> builder = ImmutableMap.builderWithExpectedSize(locked.size()); + locked.forEach((k, r) -> { + if (!k.equals(key)) builder.put(k, r); + }); + return new LockedRanges(lastModified, builder.build()); + } + + public Key intersects(AffectedRanges ranges) + { + for (Map.Entry<Key, AffectedRanges> e : locked.entrySet()) + { + if (ranges.intersects(e.getValue())) + return e.getKey(); + } + return NOT_LOCKED; + } + + @Override + public LockedRanges withLastModified(Epoch epoch) + { + return new LockedRanges(epoch, locked); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + @Override + public String toString() + { + return "LockedRanges{" + + "lastModified=" + lastModified + + ", locked=" + locked + + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof LockedRanges)) return false; + LockedRanges that = (LockedRanges) o; + return Objects.equals(lastModified, that.lastModified) && Objects.equals(locked, that.locked); + } + + @Override + public int hashCode() + { + return Objects.hash(lastModified, locked); + } + + public static Key keyFor(Epoch epoch) + { + return new Key(epoch); + } + + public interface AffectedRangesBuilder + { + AffectedRangesBuilder add(ReplicationParams params, Range<Token> range); + AffectedRanges build(); + } + + public interface AffectedRanges + { + AffectedRanges EMPTY = new AffectedRanges() + { + public boolean intersects(AffectedRanges other) + { + return false; + } + + public void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn) {} + + @Override + public String toString() + { + return "EMPTY"; + } + + public Map<ReplicationParams, Set<Range<Token>>> asMap() + { + return Collections.emptyMap(); + } + }; + + default ImmutableSet<NodeId> toPeers(DataPlacements placements, Directory directory) + { + ImmutableSet.Builder<NodeId> peers = ImmutableSet.builder(); + asMap().forEach((replication, rangeset) -> { + DataPlacement placement = placements.get(replication); + rangeset.stream() + .flatMap(range -> placement.affectedReplicas(range).stream()) + .map(directory::peerId) + .forEach(peers::add); + }); + return peers.build(); + } + + static AffectedRanges singleton(ReplicationParams replicationParams, Range<Token> tokenRange) + { + return builder().add(replicationParams, tokenRange).build(); + } + + static AffectedRangesBuilder builder() + { + return new AffectedRangesImpl(); + } + + boolean intersects(AffectedRanges other); + void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn); + Map<ReplicationParams, Set<Range<Token>>> asMap(); + + final class Serializer implements MetadataSerializer<AffectedRanges> + { + public static final Serializer instance = new Serializer(); + + public void serialize(AffectedRanges t, DataOutputPlus out, Version version) throws IOException + { + Map<ReplicationParams, Set<Range<Token>>> map = t.asMap(); + out.writeInt(map.size()); + for (Map.Entry<ReplicationParams, Set<Range<Token>>> rangeEntry : map.entrySet()) + { + ReplicationParams params = rangeEntry.getKey(); + Set<Range<Token>> ranges = rangeEntry.getValue(); + ReplicationParams.serializer.serialize(params, out, version); + out.writeInt(ranges.size()); + for (Range<Token> range : ranges) + { + Token.metadataSerializer.serialize(range.left, out, version); + Token.metadataSerializer.serialize(range.right, out, version); + } + } + } + + public AffectedRanges deserialize(DataInputPlus in, Version version) throws IOException + { + int size = in.readInt(); + Map<ReplicationParams, Set<Range<Token>>> map = Maps.newHashMapWithExpectedSize(size); + for (int x = 0; x < size; x++) + { + ReplicationParams params = ReplicationParams.serializer.deserialize(in, version); + int rangeSize = in.readInt(); + Set<Range<Token>> range = Sets.newHashSetWithExpectedSize(rangeSize); + for (int y = 0; y < rangeSize; y++) + { + range.add(new Range<>(Token.metadataSerializer.deserialize(in, version), + Token.metadataSerializer.deserialize(in, version))); + } + map.put(params, range); + } + return new AffectedRangesImpl(map); + } + + public long serializedSize(AffectedRanges t, Version version) + { + Map<ReplicationParams, Set<Range<Token>>> map = t.asMap(); + long size = sizeof(map.size()); + for (Map.Entry<ReplicationParams, Set<Range<Token>>> rangeEntry : map.entrySet()) + { + ReplicationParams params = rangeEntry.getKey(); + Set<Range<Token>> ranges = rangeEntry.getValue(); + size += ReplicationParams.serializer.serializedSize(params, version); + size += sizeof(ranges.size()); + for (Range<Token> range : ranges) + { + size += Token.metadataSerializer.serializedSize(range.left, version); + size += Token.metadataSerializer.serializedSize(range.right, version); + } + } + return size; + } + } + } + + private static final class AffectedRangesImpl implements AffectedRangesBuilder, AffectedRanges + { + private final Map<ReplicationParams, Set<Range<Token>>> map; + + public AffectedRangesImpl() + { + this(new HashMap<>()); + } + + public AffectedRangesImpl(Map<ReplicationParams, Set<Range<Token>>> map) + { + this.map = map; + } + + @Override + public AffectedRangesBuilder add(ReplicationParams params, Range<Token> range) + { + Set<Range<Token>> ranges = map.get(params); + if (ranges == null) + { + ranges = new HashSet<>(); + map.put(params, ranges); + } + + ranges.add(range); + return this; + } + + @Override + public Map<ReplicationParams, Set<Range<Token>>> asMap() + { + return map; + } + + @Override + public AffectedRanges build() + { + return this; + } + + @Override + public void foreach(BiConsumer<ReplicationParams, Set<Range<Token>>> fn) + { + map.forEach((k, v) -> fn.accept(k, Collections.unmodifiableSet(v))); + } + + @Override + public boolean intersects(AffectedRanges other) + { + if (other == EMPTY) + return false; + + for (Map.Entry<ReplicationParams, Set<Range<Token>>> e : ((AffectedRangesImpl) other).map.entrySet()) + { + for (Range<Token> otherRange : e.getValue()) + { + for (Range<Token> thisRange : map.get(e.getKey())) + { + if (thisRange.intersects(otherRange)) + return true; + } + } + } + + return false; + } + + @Override + public String toString() + { + return "AffectedRangesImpl{" + + "map=" + map + + '}'; + } + } + + public static class Key + { + public static final Serializer serializer = new Serializer(); + private final Epoch epoch; + + private Key(Epoch epoch) + { + this.epoch = epoch; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Key key1 = (Key) o; + return epoch.equals(key1.epoch); + } + + @Override + public int hashCode() + { + return Objects.hash(epoch); + } + + @Override + public String toString() + { + return "Key{" + + "key=" + epoch + + '}'; + } + + public static final class Serializer + { + public void serialize(Key t, DataOutputPlus out, Version version) throws IOException + { + Epoch.serializer.serialize(t.epoch, out, version); + } + + public Key deserialize(DataInputPlus in, Version version) throws IOException + { + return new Key(Epoch.serializer.deserialize(in, version)); + } + + public long serializedSize(Key t, Version version) + { + return Epoch.serializer.serializedSize(t.epoch, version); + } + } + } + + public static class Serializer implements MetadataSerializer<LockedRanges> + { + public void serialize(LockedRanges t, DataOutputPlus out, Version version) throws IOException + { + Epoch.serializer.serialize(t.lastModified, out, version); + out.writeInt(t.locked.size()); + for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet()) + { + Key key = entry.getKey(); + Epoch.serializer.serialize(key.epoch, out, version); + AffectedRanges.Serializer.instance.serialize(entry.getValue(), out, version); + } + } + + public LockedRanges deserialize(DataInputPlus in, Version version) throws IOException + { + Epoch lastModified = Epoch.serializer.deserialize(in, version); + int size = in.readInt(); + if (size == 0) return new LockedRanges(lastModified, ImmutableMap.of()); + ImmutableMap.Builder<Key, AffectedRanges> result = ImmutableMap.builder(); + for (int i = 0; i < size; i++) + { + Key key = new Key(Epoch.serializer.deserialize(in, version)); + AffectedRanges ranges = AffectedRanges.Serializer.instance.deserialize(in, version); + result.put(key, ranges); + } + return new LockedRanges(lastModified, result.build()); + } + + public long serializedSize(LockedRanges t, Version version) + { + long size = Epoch.serializer.serializedSize(t.lastModified, version); + size += sizeof(t.locked.size()); + for (Map.Entry<Key, AffectedRanges> entry : t.locked.entrySet()) + { + Key key = entry.getKey(); + size += Epoch.serializer.serializedSize(key.epoch, version); + size += AffectedRanges.Serializer.instance.serializedSize(entry.getValue(), version); + } + return size; + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
