http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java index db73b4f..8eb8603 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java @@ -21,6 +21,9 @@ import java.util.Collection; import java.util.Set; import java.util.UUID; +import com.google.common.base.Preconditions; + +import org.apache.cassandra.io.sstable.SSTable; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.db.RowIndexEntry; @@ -85,13 +88,15 @@ public class BigFormat implements SSTableFormat long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers, txn); + SSTable.validateRepairedMetadata(repairedAt, pendingRepair, isTransient); + return new BigTableWriter(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers, txn); } } @@ -120,7 +125,7 @@ public class BigFormat implements SSTableFormat // mb (3.0.7, 3.7): commit log lower bound included // mc (3.0.8, 3.9): commit log intervals included - // na (4.0.0): uncompressed chunks, pending repair session, checksummed sstable metadata file, new Bloomfilter format + // na (4.0.0): uncompressed chunks, pending repair session, isTransient, checksummed sstable metadata file, new Bloomfilter format // // NOTE: when adding a new version, please add that to LegacySSTableTest, too. @@ -131,6 +136,7 @@ public class BigFormat implements SSTableFormat public final boolean hasMaxCompressedLength; private final boolean hasPendingRepair; private final boolean hasMetadataChecksum; + private final boolean hasIsTransient; /** * CASSANDRA-9067: 4.0 bloom filter representation changed (two longs just swapped) * have no 'static' bits caused by using the same upper bits for both bloom filter and token distribution. @@ -148,6 +154,7 @@ public class BigFormat implements SSTableFormat hasCommitLogIntervals = version.compareTo("mc") >= 0; hasMaxCompressedLength = version.compareTo("na") >= 0; hasPendingRepair = version.compareTo("na") >= 0; + hasIsTransient = version.compareTo("na") >= 0; hasMetadataChecksum = version.compareTo("na") >= 0; hasOldBfFormat = version.compareTo("na") < 0; } @@ -176,6 +183,12 @@ public class BigFormat implements SSTableFormat } @Override + public boolean hasIsTransient() + { + return hasIsTransient; + } + + @Override public int correspondingMessagingVersion() { return correspondingMessagingVersion;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index b5488ed..7513e95 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -68,13 +68,14 @@ public class BigTableWriter extends SSTableWriter long keyCount, long repairedAt, UUID pendingRepair, + boolean isTransient, TableMetadataRef metadata, MetadataCollector metadataCollector, SerializationHeader header, Collection<SSTableFlushObserver> observers, LifecycleTransaction txn) { - super(descriptor, keyCount, repairedAt, pendingRepair, metadata, metadataCollector, header, observers); + super(descriptor, keyCount, repairedAt, pendingRepair, isTransient, metadata, metadataCollector, header, observers); txn.trackNew(this); // must track before any files are created if (compression) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java index 6a40d94..eb7b2c7 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/IMetadataSerializer.java @@ -71,7 +71,7 @@ public interface IMetadataSerializer void mutateLevel(Descriptor descriptor, int newLevel) throws IOException; /** - * Mutate the repairedAt time and pendingRepair ID + * Mutate the repairedAt time, pendingRepair ID, and transient status */ - void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException; + public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java index 9d9c1a8..36c218b 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java @@ -83,7 +83,8 @@ public class MetadataCollector implements PartitionStatisticsCollector ActiveRepairService.UNREPAIRED_SSTABLE, -1, -1, - null); + null, + false); } protected EstimatedHistogram estimatedPartitionSize = defaultPartitionSizeHistogram(); @@ -272,7 +273,7 @@ public class MetadataCollector implements PartitionStatisticsCollector this.hasLegacyCounterShards = this.hasLegacyCounterShards || hasLegacyCounterShards; } - public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, SerializationHeader header) + public Map<MetadataType, MetadataComponent> finalizeMetadata(String partitioner, double bloomFilterFPChance, long repairedAt, UUID pendingRepair, boolean isTransient, SerializationHeader header) { Map<MetadataType, MetadataComponent> components = new EnumMap<>(MetadataType.class); components.put(MetadataType.VALIDATION, new ValidationMetadata(partitioner, bloomFilterFPChance)); @@ -294,7 +295,8 @@ public class MetadataCollector implements PartitionStatisticsCollector repairedAt, totalColumnsSet, totalRows, - pendingRepair)); + pendingRepair, + isTransient)); components.put(MetadataType.COMPACTION, new CompactionMetadata(cardinality)); components.put(MetadataType.HEADER, header.toComponent()); return components; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java index 74923a0..f76db2d 100644 --- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataSerializer.java @@ -230,7 +230,7 @@ public class MetadataSerializer implements IMetadataSerializer rewriteSSTableMetadata(descriptor, currentComponents); } - public void mutateRepaired(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair) throws IOException + public void mutateRepairMetadata(Descriptor descriptor, long newRepairedAt, UUID newPendingRepair, boolean isTransient) throws IOException { if (logger.isTraceEnabled()) logger.trace("Mutating {} to repairedAt time {} and pendingRepair {}", @@ -238,7 +238,7 @@ public class MetadataSerializer implements IMetadataSerializer Map<MetadataType, MetadataComponent> currentComponents = deserialize(descriptor, EnumSet.allOf(MetadataType.class)); StatsMetadata stats = (StatsMetadata) currentComponents.remove(MetadataType.STATS); // mutate time & id - currentComponents.put(MetadataType.STATS, stats.mutateRepairedAt(newRepairedAt).mutatePendingRepair(newPendingRepair)); + currentComponents.put(MetadataType.STATS, stats.mutateRepairedMetadata(newRepairedAt, newPendingRepair, isTransient)); rewriteSSTableMetadata(descriptor, currentComponents); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java index 2b8ebef..f14fb5d 100755 --- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java +++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java @@ -64,6 +64,7 @@ public class StatsMetadata extends MetadataComponent public final long totalColumnsSet; public final long totalRows; public final UUID pendingRepair; + public final boolean isTransient; public StatsMetadata(EstimatedHistogram estimatedPartitionSize, EstimatedHistogram estimatedColumnCount, @@ -83,7 +84,8 @@ public class StatsMetadata extends MetadataComponent long repairedAt, long totalColumnsSet, long totalRows, - UUID pendingRepair) + UUID pendingRepair, + boolean isTransient) { this.estimatedPartitionSize = estimatedPartitionSize; this.estimatedColumnCount = estimatedColumnCount; @@ -104,6 +106,7 @@ public class StatsMetadata extends MetadataComponent this.totalColumnsSet = totalColumnsSet; this.totalRows = totalRows; this.pendingRepair = pendingRepair; + this.isTransient = isTransient; } public MetadataType getType() @@ -155,10 +158,11 @@ public class StatsMetadata extends MetadataComponent repairedAt, totalColumnsSet, totalRows, - pendingRepair); + pendingRepair, + isTransient); } - public StatsMetadata mutateRepairedAt(long newRepairedAt) + public StatsMetadata mutateRepairedMetadata(long newRepairedAt, UUID newPendingRepair, boolean newIsTransient) { return new StatsMetadata(estimatedPartitionSize, estimatedColumnCount, @@ -178,30 +182,8 @@ public class StatsMetadata extends MetadataComponent newRepairedAt, totalColumnsSet, totalRows, - pendingRepair); - } - - public StatsMetadata mutatePendingRepair(UUID newPendingRepair) - { - return new StatsMetadata(estimatedPartitionSize, - estimatedColumnCount, - commitLogIntervals, - minTimestamp, - maxTimestamp, - minLocalDeletionTime, - maxLocalDeletionTime, - minTTL, - maxTTL, - compressionRatio, - estimatedTombstoneDropTime, - sstableLevel, - minClusteringValues, - maxClusteringValues, - hasLegacyCounterShards, - repairedAt, - totalColumnsSet, - totalRows, - newPendingRepair); + newPendingRepair, + newIsTransient); } @Override @@ -292,6 +274,12 @@ public class StatsMetadata extends MetadataComponent if (component.pendingRepair != null) size += UUIDSerializer.serializer.serializedSize(component.pendingRepair, 0); } + + if (version.hasIsTransient()) + { + size += TypeSizes.sizeof(component.isTransient); + } + return size; } @@ -338,6 +326,11 @@ public class StatsMetadata extends MetadataComponent out.writeByte(0); } } + + if (version.hasIsTransient()) + { + out.writeBoolean(component.isTransient); + } } public StatsMetadata deserialize(Version version, DataInputPlus in) throws IOException @@ -386,6 +379,8 @@ public class StatsMetadata extends MetadataComponent pendingRepair = UUIDSerializer.serializer.deserialize(in, 0); } + boolean isTransient = version.hasIsTransient() && in.readBoolean(); + return new StatsMetadata(partitionSizes, columnCounts, commitLogIntervals, @@ -404,7 +399,8 @@ public class StatsMetadata extends MetadataComponent repairedAt, totalColumnsSet, totalRows, - pendingRepair); + pendingRepair, + isTransient); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java index 2ee8eea..2e7408b 100644 --- a/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractEndpointSnitch.java @@ -17,13 +17,12 @@ */ package org.apache.cassandra.locator; -import java.util.*; - +import com.google.common.collect.Iterables; import org.apache.cassandra.config.DatabaseDescriptor; public abstract class AbstractEndpointSnitch implements IEndpointSnitch { - public abstract int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); + public abstract int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2); /** * Sorts the <tt>Collection</tt> of node addresses by proximity to the given address @@ -31,27 +30,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch * @param unsortedAddress the nodes to sort * @return a new sorted <tt>List</tt> */ - public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress) - { - List<InetAddressAndPort> preferred = new ArrayList<>(unsortedAddress); - sortByProximity(address, preferred); - return preferred; - } - - /** - * Sorts the <tt>List</tt> of node addresses, in-place, by proximity to the given address - * @param address the address to sort the proximity by - * @param addresses the nodes to sort - */ - public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddress) { - Collections.sort(addresses, new Comparator<InetAddressAndPort>() - { - public int compare(InetAddressAndPort a1, InetAddressAndPort a2) - { - return compareEndpoints(address, a1, a2); - } - }); + return unsortedAddress.sorted((r1, r2) -> compareEndpoints(address, r1, r2)); } public void gossiperStarting() @@ -59,7 +40,7 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch // noop by default } - public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) + public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2) { // Querying remote DC is likely to be an order of magnitude slower than // querying locally, so 2 queries to local nodes is likely to still be @@ -70,14 +51,9 @@ public abstract class AbstractEndpointSnitch implements IEndpointSnitch : true; } - private boolean hasRemoteNode(List<InetAddressAndPort> l) + private boolean hasRemoteNode(ReplicaCollection<?> l) { String localDc = DatabaseDescriptor.getLocalDataCenter(); - for (InetAddressAndPort ep : l) - { - if (!localDc.equals(getDatacenter(ep))) - return true; - } - return false; + return Iterables.any(l, replica -> !localDc.equals(getDatacenter(replica))); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java index e91f6ac..08c41f0 100644 --- a/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java +++ b/src/java/org/apache/cassandra/locator/AbstractNetworkTopologySnitch.java @@ -37,8 +37,11 @@ public abstract class AbstractNetworkTopologySnitch extends AbstractEndpointSnit */ abstract public String getDatacenter(InetAddressAndPort endpoint); - public int compareEndpoints(InetAddressAndPort address, InetAddressAndPort a1, InetAddressAndPort a2) + @Override + public int compareEndpoints(InetAddressAndPort address, Replica r1, Replica r2) { + InetAddressAndPort a1 = r1.endpoint(); + InetAddressAndPort a2 = r2.endpoint(); if (address.equals(a1) && !address.equals(a2)) return -1; if (address.equals(a2) && !address.equals(a1)) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java new file mode 100644 index 0000000..ecf1296 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/AbstractReplicaCollection.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.collect.Iterables; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Stream; + +/** + * A collection like class for Replica objects. Since the Replica class contains inetaddress, range, and + * transient replication status, basic contains and remove methods can be ambiguous. Replicas forces you + * to be explicit about what you're checking the container for, or removing from it. + */ +public abstract class AbstractReplicaCollection<C extends AbstractReplicaCollection<C>> implements ReplicaCollection<C> +{ + protected static final List<Replica> EMPTY_LIST = new ArrayList<>(); // since immutable, can safely return this to avoid megamorphic callsites + + public static <C extends ReplicaCollection<C>, B extends Builder<C, ?, B>> Collector<Replica, B, C> collector(Set<Collector.Characteristics> characteristics, Supplier<B> supplier) + { + return new Collector<Replica, B, C>() + { + private final BiConsumer<B, Replica> accumulator = Builder::add; + private final BinaryOperator<B> combiner = (a, b) -> { a.addAll(b.mutable); return a; }; + private final Function<B, C> finisher = Builder::build; + public Supplier<B> supplier() { return supplier; } + public BiConsumer<B, Replica> accumulator() { return accumulator; } + public BinaryOperator<B> combiner() { return combiner; } + public Function<B, C> finisher() { return finisher; } + public Set<Characteristics> characteristics() { return characteristics; } + }; + } + + protected final List<Replica> list; + protected final boolean isSnapshot; + protected AbstractReplicaCollection(List<Replica> list, boolean isSnapshot) + { + this.list = list; + this.isSnapshot = isSnapshot; + } + + // if subList == null, should return self (or a clone thereof) + protected abstract C snapshot(List<Replica> subList); + protected abstract C self(); + /** + * construct a new Mutable of our own type, so that we can concatenate + * TODO: this isn't terribly pretty, but we need sometimes to select / merge two Endpoints of unknown type; + */ + public abstract Mutable<C> newMutable(int initialCapacity); + + + public C snapshot() + { + return isSnapshot ? self() + : snapshot(list.isEmpty() ? EMPTY_LIST + : new ArrayList<>(list)); + } + + public final C subList(int start, int end) + { + List<Replica> subList; + if (isSnapshot) + { + if (start == 0 && end == size()) return self(); + else if (start == end) subList = EMPTY_LIST; + else subList = list.subList(start, end); + } + else + { + if (start == end) subList = EMPTY_LIST; + else subList = new ArrayList<>(list.subList(start, end)); // TODO: we could take a subList here, but comodification checks stop us + } + return snapshot(subList); + } + + public final C filter(Predicate<Replica> predicate) + { + return filter(predicate, Integer.MAX_VALUE); + } + + public final C filter(Predicate<Replica> predicate, int limit) + { + if (isEmpty()) + return snapshot(); + + List<Replica> copy = null; + int beginRun = -1, endRun = -1; + int i = 0; + for (; i < list.size() ; ++i) + { + Replica replica = list.get(i); + if (predicate.test(replica)) + { + if (copy != null) + copy.add(replica); + else if (beginRun < 0) + beginRun = i; + else if (endRun > 0) + { + copy = new ArrayList<>(Math.min(limit, (list.size() - i) + (endRun - beginRun))); + for (int j = beginRun ; j < endRun ; ++j) + copy.add(list.get(j)); + copy.add(list.get(i)); + } + if (--limit == 0) + { + ++i; + break; + } + } + else if (beginRun >= 0 && endRun < 0) + endRun = i; + } + + if (beginRun < 0) + beginRun = endRun = 0; + if (endRun < 0) + endRun = i; + if (copy == null) + return subList(beginRun, endRun); + return snapshot(copy); + } + + public final class Select + { + private final List<Replica> result; + public Select(int expectedSize) + { + this.result = new ArrayList<>(expectedSize); + } + + /** + * Add matching replica to the result; this predicate should be mutually exclusive with all prior predicates. + * Stop once we have targetSize replicas in total, including preceding calls + */ + public Select add(Predicate<Replica> predicate, int targetSize) + { + assert !Iterables.any(result, predicate::test); + for (int i = 0 ; result.size() < targetSize && i < list.size() ; ++i) + if (predicate.test(list.get(i))) + result.add(list.get(i)); + return this; + } + public Select add(Predicate<Replica> predicate) + { + return add(predicate, Integer.MAX_VALUE); + } + public C get() + { + return snapshot(result); + } + } + + /** + * An efficient method for selecting a subset of replica via a sequence of filters. + * + * Example: select().add(filter1).add(filter2, 3).get(); + * + * @return a Select object + */ + public final Select select() + { + return select(list.size()); + } + public final Select select(int expectedSize) + { + return new Select(expectedSize); + } + + public final C sorted(Comparator<Replica> comparator) + { + List<Replica> copy = new ArrayList<>(list); + copy.sort(comparator); + return snapshot(copy); + } + + public final Replica get(int i) + { + return list.get(i); + } + + public final int size() + { + return list.size(); + } + + public final boolean isEmpty() + { + return list.isEmpty(); + } + + public final Iterator<Replica> iterator() + { + return list.iterator(); + } + + public final Stream<Replica> stream() { return list.stream(); } + + public final boolean equals(Object o) + { + if (this == o) return true; + if (!(o instanceof AbstractReplicaCollection<?>)) + { + if (!(o instanceof ReplicaCollection<?>)) + return false; + + ReplicaCollection<?> that = (ReplicaCollection<?>) o; + return Iterables.elementsEqual(this, that); + } + AbstractReplicaCollection<?> that = (AbstractReplicaCollection<?>) o; + return Objects.equals(list, that.list); + } + + public final int hashCode() + { + return list.hashCode(); + } + + @Override + public final String toString() + { + return list.toString(); + } + + static <C extends AbstractReplicaCollection<C>> C concat(C replicas, C extraReplicas, Mutable.Conflict ignoreConflicts) + { + if (extraReplicas.isEmpty()) + return replicas; + if (replicas.isEmpty()) + return extraReplicas; + Mutable<C> mutable = replicas.newMutable(replicas.size() + extraReplicas.size()); + mutable.addAll(replicas); + mutable.addAll(extraReplicas, ignoreConflicts); + return mutable.asImmutableView(); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java index 3e9b5bb..0ddc0a4 100644 --- a/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java +++ b/src/java/org/apache/cassandra/locator/AbstractReplicationStrategy.java @@ -22,8 +22,8 @@ import java.lang.reflect.InvocationTargetException; import java.util.*; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.HashMultimap; -import com.google.common.collect.Multimap; +import com.google.common.base.Preconditions; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,9 +73,9 @@ public abstract class AbstractReplicationStrategy // lazy-initialize keyspace itself since we don't create them until after the replication strategies } - private final Map<Token, ArrayList<InetAddressAndPort>> cachedEndpoints = new NonBlockingHashMap<Token, ArrayList<InetAddressAndPort>>(); + private final Map<Token, EndpointsForRange> cachedReplicas = new NonBlockingHashMap<>(); - public ArrayList<InetAddressAndPort> getCachedEndpoints(Token t) + public EndpointsForRange getCachedReplicas(Token t) { long lastVersion = tokenMetadata.getRingVersion(); @@ -86,13 +86,13 @@ public abstract class AbstractReplicationStrategy if (lastVersion > lastInvalidatedVersion) { logger.trace("clearing cached endpoints"); - cachedEndpoints.clear(); + cachedReplicas.clear(); lastInvalidatedVersion = lastVersion; } } } - return cachedEndpoints.get(t); + return cachedReplicas.get(t); } /** @@ -102,64 +102,65 @@ public abstract class AbstractReplicationStrategy * @param searchPosition the position the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition) + public EndpointsForToken getNaturalReplicasForToken(RingPosition searchPosition) + { + return getNaturalReplicas(searchPosition).forToken(searchPosition.getToken()); + } + + public EndpointsForRange getNaturalReplicas(RingPosition searchPosition) { Token searchToken = searchPosition.getToken(); Token keyToken = TokenMetadata.firstToken(tokenMetadata.sortedTokens(), searchToken); - ArrayList<InetAddressAndPort> endpoints = getCachedEndpoints(keyToken); + EndpointsForRange endpoints = getCachedReplicas(keyToken); if (endpoints == null) { TokenMetadata tm = tokenMetadata.cachedOnlyTokenMap(); // if our cache got invalidated, it's possible there is a new token to account for too keyToken = TokenMetadata.firstToken(tm.sortedTokens(), searchToken); - endpoints = new ArrayList<InetAddressAndPort>(calculateNaturalEndpoints(searchToken, tm)); - cachedEndpoints.put(keyToken, endpoints); + endpoints = calculateNaturalReplicas(searchToken, tm); + cachedReplicas.put(keyToken, endpoints); } - return new ArrayList<InetAddressAndPort>(endpoints); + return endpoints; } /** * calculate the natural endpoints for the given token * - * @see #getNaturalEndpoints(org.apache.cassandra.dht.RingPosition) + * @see #getNaturalReplicasForToken(org.apache.cassandra.dht.RingPosition) * * @param searchToken the token the natural endpoints are requested for * @return a copy of the natural endpoints for the given token */ - public abstract List<InetAddressAndPort> calculateNaturalEndpoints(Token searchToken, TokenMetadata tokenMetadata); + public abstract EndpointsForRange calculateNaturalReplicas(Token searchToken, TokenMetadata tokenMetadata); - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistency_level, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime) { - return getWriteResponseHandler(naturalEndpoints, pendingEndpoints, consistency_level, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); + return getWriteResponseHandler(replicaLayout, callback, writeType, queryStartNanoTime, DatabaseDescriptor.getIdealConsistencyLevel()); } - public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(Collection<InetAddressAndPort> naturalEndpoints, - Collection<InetAddressAndPort> pendingEndpoints, - ConsistencyLevel consistency_level, + public <T> AbstractWriteResponseHandler<T> getWriteResponseHandler(ReplicaLayout.ForToken replicaLayout, Runnable callback, WriteType writeType, long queryStartNanoTime, ConsistencyLevel idealConsistencyLevel) { AbstractWriteResponseHandler resultResponseHandler; - if (consistency_level.isDatacenterLocal()) + if (replicaLayout.consistencyLevel.isDatacenterLocal()) { // block for in this context will be localnodes block. - resultResponseHandler = new DatacenterWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); } - else if (consistency_level == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) + else if (replicaLayout.consistencyLevel == ConsistencyLevel.EACH_QUORUM && (this instanceof NetworkTopologyStrategy)) { - resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new DatacenterSyncWriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); } else { - resultResponseHandler = new WriteResponseHandler<T>(naturalEndpoints, pendingEndpoints, consistency_level, getKeyspace(), callback, writeType, queryStartNanoTime); + resultResponseHandler = new WriteResponseHandler<T>(replicaLayout, callback, writeType, queryStartNanoTime); } //Check if tracking the ideal consistency level is configured @@ -168,16 +169,14 @@ public abstract class AbstractReplicationStrategy //If ideal and requested are the same just use this handler to track the ideal consistency level //This is also used so that the ideal consistency level handler when constructed knows it is the ideal //one for tracking purposes - if (idealConsistencyLevel == consistency_level) + if (idealConsistencyLevel == replicaLayout.consistencyLevel) { resultResponseHandler.setIdealCLResponseHandler(resultResponseHandler); } else { //Construct a delegate response handler to use to track the ideal consistency level - AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(naturalEndpoints, - pendingEndpoints, - idealConsistencyLevel, + AbstractWriteResponseHandler idealHandler = getWriteResponseHandler(replicaLayout.withConsistencyLevel(idealConsistencyLevel), callback, writeType, queryStartNanoTime, @@ -202,7 +201,12 @@ public abstract class AbstractReplicationStrategy * * @return the replication factor */ - public abstract int getReplicationFactor(); + public abstract ReplicationFactor getReplicationFactor(); + + public boolean hasTransientReplicas() + { + return getReplicationFactor().hasTransientReplicas(); + } /* * NOTE: this is pretty inefficient. also the inverse (getRangeAddresses) below. @@ -210,53 +214,81 @@ public abstract class AbstractReplicationStrategy * (fixing this would probably require merging tokenmetadata into replicationstrategy, * so we could cache/invalidate cleanly.) */ - public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges(TokenMetadata metadata) + public RangesByEndpoint getAddressReplicas(TokenMetadata metadata) { - Multimap<InetAddressAndPort, Range<Token>> map = HashMultimap.create(); + RangesByEndpoint.Mutable map = new RangesByEndpoint.Mutable(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); - for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) + for (Replica replica : calculateNaturalReplicas(token, metadata)) { - map.put(ep, range); + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + map.put(replica.endpoint(), replica); } } - return map; + return map.asImmutableView(); } - public Multimap<Range<Token>, InetAddressAndPort> getRangeAddresses(TokenMetadata metadata) + public RangesAtEndpoint getAddressReplicas(TokenMetadata metadata, InetAddressAndPort endpoint) { - Multimap<Range<Token>, InetAddressAndPort> map = HashMultimap.create(); + RangesAtEndpoint.Builder builder = RangesAtEndpoint.builder(endpoint); + for (Token token : metadata.sortedTokens()) + { + Range<Token> range = metadata.getPrimaryRangeFor(token); + Replica replica = calculateNaturalReplicas(token, metadata) + .byEndpoint().get(endpoint); + if (replica != null) + { + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + builder.add(replica, Conflict.DUPLICATE); + } + } + return builder.build(); + } + + + public EndpointsByRange getRangeAddresses(TokenMetadata metadata) + { + EndpointsByRange.Mutable map = new EndpointsByRange.Mutable(); for (Token token : metadata.sortedTokens()) { Range<Token> range = metadata.getPrimaryRangeFor(token); - for (InetAddressAndPort ep : calculateNaturalEndpoints(token, metadata)) + for (Replica replica : calculateNaturalReplicas(token, metadata)) { - map.put(range, ep); + // LocalStrategy always returns (min, min] ranges for it's replicas, so we skip the check here + Preconditions.checkState(range.equals(replica.range()) || this instanceof LocalStrategy); + map.put(range, replica); } } - return map; + return map.asImmutableView(); } - public Multimap<InetAddressAndPort, Range<Token>> getAddressRanges() + public RangesByEndpoint getAddressReplicas() { - return getAddressRanges(tokenMetadata.cloneOnlyTokenMap()); + return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap()); } - public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress) + public RangesAtEndpoint getAddressReplicas(InetAddressAndPort endpoint) { - return getPendingAddressRanges(metadata, Arrays.asList(pendingToken), pendingAddress); + return getAddressReplicas(tokenMetadata.cloneOnlyTokenMap(), endpoint); } - public Collection<Range<Token>> getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress) + public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Token pendingToken, InetAddressAndPort pendingAddress) + { + return getPendingAddressRanges(metadata, Collections.singleton(pendingToken), pendingAddress); + } + + public RangesAtEndpoint getPendingAddressRanges(TokenMetadata metadata, Collection<Token> pendingTokens, InetAddressAndPort pendingAddress) { TokenMetadata temp = metadata.cloneOnlyTokenMap(); temp.updateNormalTokens(pendingTokens, pendingAddress); - return getAddressRanges(temp).get(pendingAddress); + return getAddressReplicas(temp, pendingAddress); } public abstract void validateOptions() throws ConfigurationException; @@ -329,6 +361,10 @@ public abstract class AbstractReplicationStrategy AbstractReplicationStrategy strategy = createInternal(keyspaceName, strategyClass, tokenMetadata, snitch, strategyOptions); strategy.validateExpectedOptions(); strategy.validateOptions(); + if (strategy.hasTransientReplicas() && !DatabaseDescriptor.isTransientReplicationEnabled()) + { + throw new ConfigurationException("Transient replication is disabled. Enable in cassandra.yaml to use."); + } } public static Class<AbstractReplicationStrategy> getClass(String cls) throws ConfigurationException @@ -344,21 +380,23 @@ public abstract class AbstractReplicationStrategy public boolean hasSameSettings(AbstractReplicationStrategy other) { - return getClass().equals(other.getClass()) && getReplicationFactor() == other.getReplicationFactor(); + return getClass().equals(other.getClass()) && getReplicationFactor().equals(other.getReplicationFactor()); } - protected void validateReplicationFactor(String rf) throws ConfigurationException + protected void validateReplicationFactor(String s) throws ConfigurationException { try { - if (Integer.parseInt(rf) < 0) + ReplicationFactor rf = ReplicationFactor.fromString(s); + if (rf.hasTransientReplicas()) { - throw new ConfigurationException("Replication factor must be non-negative; found " + rf); + if (DatabaseDescriptor.getNumTokens() > 1) + throw new ConfigurationException(String.format("Transient replication is not supported with vnodes yet")); } } - catch (NumberFormatException e2) + catch (IllegalArgumentException e) { - throw new ConfigurationException("Replication factor must be numeric; found " + rf); + throw new ConfigurationException(e.getMessage()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java index 010c892..d35f1fb 100644 --- a/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/DynamicEndpointSnitch.java @@ -42,7 +42,6 @@ import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.FBUtilities; - /** * A dynamic snitch that sorts endpoints by latency with an adapted phi failure detector */ @@ -185,55 +184,38 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return subsnitch.getDatacenter(endpoint); } - public List<InetAddressAndPort> getSortedListByProximity(final InetAddressAndPort address, Collection<InetAddressAndPort> addresses) - { - List<InetAddressAndPort> list = new ArrayList<>(addresses); - sortByProximity(address, list); - return list; - } - @Override - public void sortByProximity(final InetAddressAndPort address, List<InetAddressAndPort> addresses) + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C unsortedAddresses) { assert address.equals(FBUtilities.getBroadcastAddressAndPort()); // we only know about ourself - if (dynamicBadnessThreshold == 0) - { - sortByProximityWithScore(address, addresses); - } - else - { - sortByProximityWithBadness(address, addresses); - } + return dynamicBadnessThreshold == 0 + ? sortedByProximityWithScore(address, unsortedAddresses) + : sortedByProximityWithBadness(address, unsortedAddresses); } - private void sortByProximityWithScore(final InetAddressAndPort address, List<InetAddressAndPort> addresses) + private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithScore(final InetAddressAndPort address, C unsortedAddresses) { // Scores can change concurrently from a call to this method. But Collections.sort() expects // its comparator to be "stable", that is 2 endpoint should compare the same way for the duration // of the sort() call. As we copy the scores map on write, it is thus enough to alias the current // version of it during this call. final HashMap<InetAddressAndPort, Double> scores = this.scores; - Collections.sort(addresses, new Comparator<InetAddressAndPort>() - { - public int compare(InetAddressAndPort a1, InetAddressAndPort a2) - { - return compareEndpoints(address, a1, a2, scores); - } - }); + return unsortedAddresses.sorted((r1, r2) -> compareEndpoints(address, r1, r2, scores)); } - private void sortByProximityWithBadness(final InetAddressAndPort address, List<InetAddressAndPort> addresses) + private <C extends ReplicaCollection<? extends C>> C sortedByProximityWithBadness(final InetAddressAndPort address, C replicas) { - if (addresses.size() < 2) - return; + if (replicas.size() < 2) + return replicas; - subsnitch.sortByProximity(address, addresses); + // TODO: avoid copy + replicas = subsnitch.sortedByProximity(address, replicas); HashMap<InetAddressAndPort, Double> scores = this.scores; // Make sure the score don't change in the middle of the loop below // (which wouldn't really matter here but its cleaner that way). - ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(addresses.size()); - for (InetAddressAndPort inet : addresses) + ArrayList<Double> subsnitchOrderedScores = new ArrayList<>(replicas.size()); + for (Replica replica : replicas) { - Double score = scores.get(inet); + Double score = scores.get(replica.endpoint()); if (score == null) score = 0.0; subsnitchOrderedScores.add(score); @@ -250,17 +232,18 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa { if (subsnitchScore > (sortedScoreIterator.next() * (1.0 + dynamicBadnessThreshold))) { - sortByProximityWithScore(address, addresses); - return; + return sortedByProximityWithScore(address, replicas); } } + + return replicas; } // Compare endpoints given an immutable snapshot of the scores - private int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2, Map<InetAddressAndPort, Double> scores) + private int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2, Map<InetAddressAndPort, Double> scores) { - Double scored1 = scores.get(a1); - Double scored2 = scores.get(a2); + Double scored1 = scores.get(a1.endpoint()); + Double scored2 = scores.get(a2.endpoint()); if (scored1 == null) { @@ -280,7 +263,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return 1; } - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2) + public int compareEndpoints(InetAddressAndPort target, Replica a1, Replica a2) { // That function is fundamentally unsafe because the scores can change at any time and so the result of that // method is not stable for identical arguments. This is why we don't rely on super.sortByProximity() in @@ -414,7 +397,7 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa return getSeverity(FBUtilities.getBroadcastAddressAndPort()); } - public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2) + public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2) { if (!subsnitch.isWorthMergingForRangeQuery(merged, l1, l2)) return false; @@ -434,12 +417,12 @@ public class DynamicEndpointSnitch extends AbstractEndpointSnitch implements ILa } // Return the max score for the endpoint in the provided list, or -1.0 if no node have a score. - private double maxScore(List<InetAddressAndPort> endpoints) + private double maxScore(ReplicaCollection<?> endpoints) { double maxScore = -1.0; - for (InetAddressAndPort endpoint : endpoints) + for (Replica replica : endpoints) { - Double score = scores.get(endpoint); + Double score = scores.get(replica.endpoint()); if (score == null) continue; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Ec2Snitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Ec2Snitch.java b/src/java/org/apache/cassandra/locator/Ec2Snitch.java index b6aafd3..d0474e4 100644 --- a/src/java/org/apache/cassandra/locator/Ec2Snitch.java +++ b/src/java/org/apache/cassandra/locator/Ec2Snitch.java @@ -68,7 +68,7 @@ public class Ec2Snitch extends AbstractNetworkTopologySnitch { String az = awsApiCall(ZONE_NAME_QUERY_URL); - // if using the full naming scheme, region name is created by removing letters from the + // if using the full naming scheme, region name is created by removing letters from the // end of the availability zone and zone is the full zone name usingLegacyNaming = isUsingLegacyNaming(props); String region; http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/Endpoints.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/Endpoints.java b/src/java/org/apache/cassandra/locator/Endpoints.java new file mode 100644 index 0000000..3d5faa4 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/Endpoints.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; +import org.apache.cassandra.utils.FBUtilities; + +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +public abstract class Endpoints<E extends Endpoints<E>> extends AbstractReplicaCollection<E> +{ + static final Map<InetAddressAndPort, Replica> EMPTY_MAP = Collections.unmodifiableMap(new LinkedHashMap<>()); + + volatile Map<InetAddressAndPort, Replica> byEndpoint; + + Endpoints(List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint) + { + super(list, isSnapshot); + this.byEndpoint = byEndpoint; + } + + @Override + public Set<InetAddressAndPort> endpoints() + { + return byEndpoint().keySet(); + } + + public Map<InetAddressAndPort, Replica> byEndpoint() + { + Map<InetAddressAndPort, Replica> map = byEndpoint; + if (map == null) + byEndpoint = map = buildByEndpoint(list); + return map; + } + + public boolean contains(InetAddressAndPort endpoint, boolean isFull) + { + Replica replica = byEndpoint().get(endpoint); + return replica != null && replica.isFull() == isFull; + } + + @Override + public boolean contains(Replica replica) + { + return replica != null + && Objects.equals( + byEndpoint().get(replica.endpoint()), + replica); + } + + private static Map<InetAddressAndPort, Replica> buildByEndpoint(List<Replica> list) + { + // TODO: implement a delegating map that uses our superclass' list, and is immutable + Map<InetAddressAndPort, Replica> byEndpoint = new LinkedHashMap<>(list.size()); + for (Replica replica : list) + { + Replica prev = byEndpoint.put(replica.endpoint(), replica); + assert prev == null : "duplicate endpoint in EndpointsForRange: " + prev + " and " + replica; + } + + return Collections.unmodifiableMap(byEndpoint); + } + + public E withoutSelf() + { + InetAddressAndPort self = FBUtilities.getBroadcastAddressAndPort(); + return filter(r -> !self.equals(r.endpoint())); + } + + public E without(Set<InetAddressAndPort> remove) + { + return filter(r -> !remove.contains(r.endpoint())); + } + + public E keep(Set<InetAddressAndPort> keep) + { + return filter(r -> keep.contains(r.endpoint())); + } + + public E keep(Iterable<InetAddressAndPort> endpoints) + { + ReplicaCollection.Mutable<E> copy = newMutable( + endpoints instanceof Collection<?> + ? ((Collection<InetAddressAndPort>) endpoints).size() + : size() + ); + Map<InetAddressAndPort, Replica> byEndpoint = byEndpoint(); + for (InetAddressAndPort endpoint : endpoints) + { + Replica keep = byEndpoint.get(endpoint); + if (keep == null) + continue; + copy.add(keep, ReplicaCollection.Mutable.Conflict.DUPLICATE); + } + return copy.asSnapshot(); + } + + /** + * Care must be taken to ensure no conflicting ranges occur in pending and natural. + * Conflicts can occur for two reasons: + * 1) due to lack of isolation when reading pending/natural + * 2) because a movement that changes the type of replication from transient to full must be handled + * differently for reads and writes (with the reader treating it as transient, and writer as full) + * + * The method haveConflicts() below, and resolveConflictsInX, are used to detect and resolve any issues + */ + public static <E extends Endpoints<E>> E concat(E natural, E pending) + { + return AbstractReplicaCollection.concat(natural, pending, Conflict.NONE); + } + + public static <E extends Endpoints<E>> boolean haveConflicts(E natural, E pending) + { + Set<InetAddressAndPort> naturalEndpoints = natural.endpoints(); + for (InetAddressAndPort pendingEndpoint : pending.endpoints()) + { + if (naturalEndpoints.contains(pendingEndpoint)) + return true; + } + return false; + } + + // must apply first + public static <E extends Endpoints<E>> E resolveConflictsInNatural(E natural, E pending) + { + return natural.filter(r -> !r.isTransient() || !pending.contains(r.endpoint(), true)); + } + + // must apply second + public static <E extends Endpoints<E>> E resolveConflictsInPending(E natural, E pending) + { + return pending.without(natural.endpoints()); + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsByRange.java b/src/java/org/apache/cassandra/locator/EndpointsByRange.java new file mode 100644 index 0000000..cdc8a68 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsByRange.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; + +import java.util.Collections; +import java.util.Map; + +public class EndpointsByRange extends ReplicaMultimap<Range<Token>, EndpointsForRange> +{ + public EndpointsByRange(Map<Range<Token>, EndpointsForRange> map) + { + super(map); + } + + public EndpointsForRange get(Range<Token> range) + { + Preconditions.checkNotNull(range); + return map.getOrDefault(range, EndpointsForRange.empty(range)); + } + + public static class Mutable extends ReplicaMultimap.Mutable<Range<Token>, EndpointsForRange.Mutable> + { + @Override + protected EndpointsForRange.Mutable newMutable(Range<Token> range) + { + return new EndpointsForRange.Mutable(range); + } + + // TODO: consider all ignoreDuplicates cases + public void putAll(Range<Token> range, EndpointsForRange replicas, Conflict ignoreConflicts) + { + get(range).addAll(replicas, ignoreConflicts); + } + + public EndpointsByRange asImmutableView() + { + return new EndpointsByRange(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView))); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsByReplica.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsByReplica.java b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java new file mode 100644 index 0000000..ceea2d1 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsByReplica.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import org.apache.cassandra.locator.ReplicaCollection.Mutable.Conflict; + +import java.util.Collections; +import java.util.Map; + +public class EndpointsByReplica extends ReplicaMultimap<Replica, EndpointsForRange> +{ + public EndpointsByReplica(Map<Replica, EndpointsForRange> map) + { + super(map); + } + + public EndpointsForRange get(Replica range) + { + Preconditions.checkNotNull(range); + return map.getOrDefault(range, EndpointsForRange.empty(range.range())); + } + + public static class Mutable extends ReplicaMultimap.Mutable<Replica, EndpointsForRange.Mutable> + { + @Override + protected EndpointsForRange.Mutable newMutable(Replica replica) + { + return new EndpointsForRange.Mutable(replica.range()); + } + + // TODO: consider all ignoreDuplicates cases + public void putAll(Replica range, EndpointsForRange replicas, Conflict ignoreConflicts) + { + map.computeIfAbsent(range, r -> newMutable(r)).addAll(replicas, ignoreConflicts); + } + + public EndpointsByReplica asImmutableView() + { + return new EndpointsByReplica(Collections.unmodifiableMap(Maps.transformValues(map, EndpointsForRange.Mutable::asImmutableView))); + } + } + +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForRange.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForRange.java b/src/java/org/apache/cassandra/locator/EndpointsForRange.java new file mode 100644 index 0000000..c2d8232 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsForRange.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import org.apache.cassandra.dht.Range; +import org.apache.cassandra.dht.Token; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.Iterables.all; + +/** + * A ReplicaCollection where all Replica are required to cover a range that fully contains the range() defined in the builder(). + * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case + * only the first occurrence makes the cut). + */ +public class EndpointsForRange extends Endpoints<EndpointsForRange> +{ + private final Range<Token> range; + private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot) + { + this(range, list, isSnapshot, null); + } + private EndpointsForRange(Range<Token> range, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint) + { + super(list, isSnapshot, byEndpoint); + this.range = range; + assert range != null; + } + + public Range<Token> range() + { + return range; + } + + @Override + public Mutable newMutable(int initialCapacity) + { + return new Mutable(range, initialCapacity); + } + + public EndpointsForToken forToken(Token token) + { + if (!range.contains(token)) + throw new IllegalArgumentException(token + " is not contained within " + range); + return new EndpointsForToken(token, list, isSnapshot, byEndpoint); + } + + @Override + public EndpointsForRange self() + { + return this; + } + + @Override + protected EndpointsForRange snapshot(List<Replica> snapshot) + { + if (snapshot.isEmpty()) return empty(range); + return new EndpointsForRange(range, snapshot, true); + } + + public static class Mutable extends EndpointsForRange implements ReplicaCollection.Mutable<EndpointsForRange> + { + boolean hasSnapshot; + public Mutable(Range<Token> range) { this(range, 0); } + public Mutable(Range<Token> range, int capacity) { super(range, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + + public void add(Replica replica, Conflict ignoreConflict) + { + if (hasSnapshot) throw new IllegalStateException(); + Preconditions.checkNotNull(replica); + if (!replica.range().contains(super.range)) + throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.range); + + Replica prev = super.byEndpoint.put(replica.endpoint(), replica); + if (prev != null) + { + super.byEndpoint.put(replica.endpoint(), prev); // restore prev + switch (ignoreConflict) + { + case DUPLICATE: + if (prev.equals(replica)) + break; + case NONE: + throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev); + case ALL: + } + return; + } + + list.add(replica); + } + + @Override + public Map<InetAddressAndPort, Replica> byEndpoint() + { + // our internal map is modifiable, but it is unsafe to modify the map externally + // it would be possible to implement a safe modifiable map, but it is probably not valuable + return Collections.unmodifiableMap(super.byEndpoint()); + } + + private EndpointsForRange get(boolean isSnapshot) + { + return new EndpointsForRange(super.range, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint)); + } + + public EndpointsForRange asImmutableView() + { + return get(false); + } + + public EndpointsForRange asSnapshot() + { + hasSnapshot = true; + return get(true); + } + } + + public static class Builder extends ReplicaCollection.Builder<EndpointsForRange, Mutable, EndpointsForRange.Builder> + { + public Builder(Range<Token> range) { this(range, 0); } + public Builder(Range<Token> range, int capacity) { super (new Mutable(range, capacity)); } + public boolean containsEndpoint(InetAddressAndPort endpoint) + { + return mutable.asImmutableView().byEndpoint.containsKey(endpoint); + } + } + + public static Builder builder(Range<Token> range) + { + return new Builder(range); + } + public static Builder builder(Range<Token> range, int capacity) + { + return new Builder(range, capacity); + } + + public static EndpointsForRange empty(Range<Token> range) + { + return new EndpointsForRange(range, EMPTY_LIST, true, EMPTY_MAP); + } + + public static EndpointsForRange of(Replica replica) + { + // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic + ArrayList<Replica> one = new ArrayList<>(1); + one.add(replica); + // we can safely use singletonMap, as we only otherwise use LinkedHashMap + return new EndpointsForRange(replica.range(), one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica))); + } + + public static EndpointsForRange of(Replica ... replicas) + { + return copyOf(Arrays.asList(replicas)); + } + + public static EndpointsForRange copyOf(Collection<Replica> replicas) + { + if (replicas.isEmpty()) + throw new IllegalArgumentException("Collection must be non-empty to copy"); + Range<Token> range = replicas.iterator().next().range(); + assert all(replicas, r -> range.equals(r.range())); + return builder(range, replicas.size()).addAll(replicas).build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/EndpointsForToken.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/EndpointsForToken.java b/src/java/org/apache/cassandra/locator/EndpointsForToken.java new file mode 100644 index 0000000..f24c615 --- /dev/null +++ b/src/java/org/apache/cassandra/locator/EndpointsForToken.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.locator; + +import com.google.common.base.Preconditions; +import org.apache.cassandra.dht.Token; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * A ReplicaCollection where all Replica are required to cover a range that fully contains the token() defined in the builder(). + * Endpoints are guaranteed to be unique; on construction, this is enforced unless optionally silenced (in which case + * only the first occurrence makes the cut). + */ +public class EndpointsForToken extends Endpoints<EndpointsForToken> +{ + private final Token token; + private EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot) + { + this(token, list, isSnapshot, null); + } + + EndpointsForToken(Token token, List<Replica> list, boolean isSnapshot, Map<InetAddressAndPort, Replica> byEndpoint) + { + super(list, isSnapshot, byEndpoint); + this.token = token; + assert token != null; + } + + public Token token() + { + return token; + } + + @Override + public Mutable newMutable(int initialCapacity) + { + return new Mutable(token, initialCapacity); + } + + @Override + public EndpointsForToken self() + { + return this; + } + + @Override + protected EndpointsForToken snapshot(List<Replica> subList) + { + if (subList.isEmpty()) return empty(token); + return new EndpointsForToken(token, subList, true); + } + + public static class Mutable extends EndpointsForToken implements ReplicaCollection.Mutable<EndpointsForToken> + { + boolean hasSnapshot; + public Mutable(Token token) { this(token, 0); } + public Mutable(Token token, int capacity) { super(token, new ArrayList<>(capacity), false, new LinkedHashMap<>()); } + + public void add(Replica replica, Conflict ignoreConflict) + { + if (hasSnapshot) throw new IllegalStateException(); + Preconditions.checkNotNull(replica); + if (!replica.range().contains(super.token)) + throw new IllegalArgumentException("Replica " + replica + " does not contain " + super.token); + + Replica prev = super.byEndpoint.put(replica.endpoint(), replica); + if (prev != null) + { + super.byEndpoint.put(replica.endpoint(), prev); // restore prev + switch (ignoreConflict) + { + case DUPLICATE: + if (prev.equals(replica)) + break; + case NONE: + throw new IllegalArgumentException("Conflicting replica added (expected unique endpoints): " + replica + "; existing: " + prev); + case ALL: + } + return; + } + + list.add(replica); + } + + @Override + public Map<InetAddressAndPort, Replica> byEndpoint() + { + // our internal map is modifiable, but it is unsafe to modify the map externally + // it would be possible to implement a safe modifiable map, but it is probably not valuable + return Collections.unmodifiableMap(super.byEndpoint()); + } + + private EndpointsForToken get(boolean isSnapshot) + { + return new EndpointsForToken(super.token, super.list, isSnapshot, Collections.unmodifiableMap(super.byEndpoint)); + } + + public EndpointsForToken asImmutableView() + { + return get(false); + } + + public EndpointsForToken asSnapshot() + { + hasSnapshot = true; + return get(true); + } + } + + public static class Builder extends ReplicaCollection.Builder<EndpointsForToken, Mutable, EndpointsForToken.Builder> + { + public Builder(Token token) { this(token, 0); } + public Builder(Token token, int capacity) { super (new Mutable(token, capacity)); } + } + + public static Builder builder(Token token) + { + return new Builder(token); + } + public static Builder builder(Token token, int capacity) + { + return new Builder(token, capacity); + } + + public static EndpointsForToken empty(Token token) + { + return new EndpointsForToken(token, EMPTY_LIST, true, EMPTY_MAP); + } + + public static EndpointsForToken of(Token token, Replica replica) + { + // we only use ArrayList or ArrayList.SubList, to ensure callsites are bimorphic + ArrayList<Replica> one = new ArrayList<>(1); + one.add(replica); + // we can safely use singletonMap, as we only otherwise use LinkedHashMap + return new EndpointsForToken(token, one, true, Collections.unmodifiableMap(Collections.singletonMap(replica.endpoint(), replica))); + } + + public static EndpointsForToken of(Token token, Replica ... replicas) + { + return copyOf(token, Arrays.asList(replicas)); + } + + public static EndpointsForToken copyOf(Token token, Collection<Replica> replicas) + { + if (replicas.isEmpty()) return empty(token); + return builder(token, replicas.size()).addAll(replicas).build(); + } +} http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/IEndpointSnitch.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java index 63d333b..b7797b0 100644 --- a/src/java/org/apache/cassandra/locator/IEndpointSnitch.java +++ b/src/java/org/apache/cassandra/locator/IEndpointSnitch.java @@ -17,8 +17,6 @@ */ package org.apache.cassandra.locator; -import java.util.Collection; -import java.util.List; import java.util.Set; /** @@ -39,20 +37,20 @@ public interface IEndpointSnitch */ public String getDatacenter(InetAddressAndPort endpoint); - /** - * returns a new <tt>List</tt> sorted by proximity to the given endpoint - */ - public List<InetAddressAndPort> getSortedListByProximity(InetAddressAndPort address, Collection<InetAddressAndPort> unsortedAddress); + default public String getDatacenter(Replica replica) + { + return getDatacenter(replica.endpoint()); + } /** - * This method will sort the <tt>List</tt> by proximity to the given address. + * returns a new <tt>List</tt> sorted by proximity to the given endpoint */ - public void sortByProximity(InetAddressAndPort address, List<InetAddressAndPort> addresses); + public <C extends ReplicaCollection<? extends C>> C sortedByProximity(final InetAddressAndPort address, C addresses); /** * compares two endpoints in relation to the target endpoint, returning as Comparator.compare would */ - public int compareEndpoints(InetAddressAndPort target, InetAddressAndPort a1, InetAddressAndPort a2); + public int compareEndpoints(InetAddressAndPort target, Replica r1, Replica r2); /** * called after Gossiper instance exists immediately before it starts gossiping @@ -63,7 +61,7 @@ public interface IEndpointSnitch * Returns whether for a range query doing a query against merged is likely * to be faster than 2 sequential queries, one against l1 followed by one against l2. */ - public boolean isWorthMergingForRangeQuery(List<InetAddressAndPort> merged, List<InetAddressAndPort> l1, List<InetAddressAndPort> l2); + public boolean isWorthMergingForRangeQuery(ReplicaCollection<?> merged, ReplicaCollection<?> l1, ReplicaCollection<?> l2); /** * Determine if the datacenter or rack values in the current node's snitch conflict with those passed in parameters. http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/InetAddressAndPort.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java index 38a1a49..a47c72a 100644 --- a/src/java/org/apache/cassandra/locator/InetAddressAndPort.java +++ b/src/java/org/apache/cassandra/locator/InetAddressAndPort.java @@ -25,6 +25,7 @@ import java.net.UnknownHostException; import com.google.common.base.Preconditions; import com.google.common.net.HostAndPort; +import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.FastByteOperations; /** @@ -191,9 +192,9 @@ public final class InetAddressAndPort implements Comparable<InetAddressAndPort>, return InetAddressAndPort.getByAddress(InetAddress.getLoopbackAddress()); } - public static InetAddressAndPort getLocalHost() throws UnknownHostException + public static InetAddressAndPort getLocalHost() { - return InetAddressAndPort.getByAddress(InetAddress.getLocalHost()); + return FBUtilities.getLocalAddressAndPort(); } public static void initializeDefaultPort(int port) http://git-wip-us.apache.org/repos/asf/cassandra/blob/f7431b43/src/java/org/apache/cassandra/locator/LocalStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/locator/LocalStrategy.java b/src/java/org/apache/cassandra/locator/LocalStrategy.java index a76fe96..41cc9b0 100644 --- a/src/java/org/apache/cassandra/locator/LocalStrategy.java +++ b/src/java/org/apache/cassandra/locator/LocalStrategy.java @@ -17,12 +17,11 @@ */ package org.apache.cassandra.locator; -import java.util.ArrayList; import java.util.Collections; import java.util.Collection; -import java.util.List; import java.util.Map; +import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.dht.RingPosition; import org.apache.cassandra.dht.Token; @@ -30,32 +29,40 @@ import org.apache.cassandra.utils.FBUtilities; public class LocalStrategy extends AbstractReplicationStrategy { + private static final ReplicationFactor RF = ReplicationFactor.fullOnly(1); + private final EndpointsForRange replicas; + public LocalStrategy(String keyspaceName, TokenMetadata tokenMetadata, IEndpointSnitch snitch, Map<String, String> configOptions) { super(keyspaceName, tokenMetadata, snitch, configOptions); + replicas = EndpointsForRange.of( + new Replica(FBUtilities.getBroadcastAddressAndPort(), + DatabaseDescriptor.getPartitioner().getMinimumToken(), + DatabaseDescriptor.getPartitioner().getMinimumToken(), + true + ) + ); } /** - * We need to override this even if we override calculateNaturalEndpoints, + * We need to override this even if we override calculateNaturalReplicas, * because the default implementation depends on token calculations but * LocalStrategy may be used before tokens are set up. */ @Override - public ArrayList<InetAddressAndPort> getNaturalEndpoints(RingPosition searchPosition) + public EndpointsForRange getNaturalReplicas(RingPosition searchPosition) { - ArrayList<InetAddressAndPort> l = new ArrayList<InetAddressAndPort>(1); - l.add(FBUtilities.getBroadcastAddressAndPort()); - return l; + return replicas; } - public List<InetAddressAndPort> calculateNaturalEndpoints(Token token, TokenMetadata metadata) + public EndpointsForRange calculateNaturalReplicas(Token token, TokenMetadata metadata) { - return Collections.singletonList(FBUtilities.getBroadcastAddressAndPort()); + return replicas; } - public int getReplicationFactor() + public ReplicationFactor getReplicationFactor() { - return 1; + return RF; } public void validateOptions() throws ConfigurationException --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org