This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch cep-21-tcm in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 2eac1151bd26b61fa60f0c4fe46a3f034f5acef4 Author: Sam Tunnicliffe <[email protected]> AuthorDate: Thu Mar 2 19:20:39 2023 +0000 [CEP-21] Update ColumnFamilyStore (4/7) Part 4 of 7 modifications to ColumnFamilyStore, mostly related to: * ShardBoundaries * DiskBoundaries Co-authored-by: Marcus Eriksson <[email protected]> Co-authored-by: Alex Petrov <[email protected]> Co-authored-by: Sam Tunnicliffe <[email protected]> --- .../org/apache/cassandra/db/ColumnFamilyStore.java | 28 +++-- .../org/apache/cassandra/db/DiskBoundaries.java | 23 ++-- .../apache/cassandra/db/DiskBoundaryManager.java | 84 ++++++++------ .../cassandra/db/memtable/ShardBoundaries.java | 13 ++- .../org/apache/cassandra/tcm/ClusterMetadata.java | 124 +++++++++++++-------- 5 files changed, 168 insertions(+), 104 deletions(-) diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 0affba5814..99e0bcde68 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -90,14 +90,14 @@ import org.apache.cassandra.db.compaction.CompactionStrategyManager; import org.apache.cassandra.db.compaction.OperationType; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.DataLimits; +import org.apache.cassandra.db.memtable.Flushing; +import org.apache.cassandra.db.memtable.Memtable; +import org.apache.cassandra.db.memtable.ShardBoundaries; import org.apache.cassandra.db.lifecycle.LifecycleNewTracker; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.db.lifecycle.Tracker; import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.db.memtable.Flushing; -import org.apache.cassandra.db.memtable.Memtable; -import org.apache.cassandra.db.memtable.ShardBoundaries; import org.apache.cassandra.db.partitions.CachedPartition; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.repair.CassandraTableRepairManager; @@ -162,6 +162,7 @@ import org.apache.cassandra.service.snapshot.SnapshotManifest; import org.apache.cassandra.service.snapshot.TableSnapshot; import org.apache.cassandra.streaming.TableStreamManager; import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.utils.ByteBufferUtil; import org.apache.cassandra.utils.DefaultValue; import org.apache.cassandra.utils.ExecutorUtils; @@ -520,6 +521,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner data.addInitialSSTablesWithoutUpdatingSize(sstables, this); } + // compaction strategy should be created after the CFS has been prepared compactionStrategyManager = new CompactionStrategyManager(this); if (maxCompactionThreshold.value() <= 0 || minCompactionThreshold.value() <=0) @@ -1460,17 +1462,16 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner if (shardBoundaries == null || shardBoundaries.shardCount() != shardCount || - shardBoundaries.ringVersion != -1 && shardBoundaries.ringVersion != StorageService.instance.getTokenMetadata().getRingVersion()) + !shardBoundaries.epoch.equals(Epoch.EMPTY) && !shardBoundaries.epoch.equals(metadata.epoch)) { List<Splitter.WeightedRange> weightedRanges; - long ringVersion; + Epoch epoch; if (!SchemaConstants.isLocalSystemKeyspace(keyspace.getName()) - && getPartitioner() == metadata.current().partitioner) + && getPartitioner() == metadata.current().tokenMap.partitioner()) { DiskBoundaryManager.VersionedRangesAtEndpoint versionedLocalRanges = DiskBoundaryManager.getVersionedLocalRanges(this); Set<Range<Token>> localRanges = versionedLocalRanges.rangesAtEndpoint.ranges(); - ringVersion = versionedLocalRanges.ringVersion; - + epoch = versionedLocalRanges.epoch; if (!localRanges.isEmpty()) { @@ -1494,12 +1495,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner // Local tables need to cover the full token range and don't care about ring changes. // We also end up here if the table's partitioner is not the database's, which can happen in tests. weightedRanges = fullWeightedRange(); - ringVersion = -1; + epoch = Epoch.EMPTY; } List<Token> boundaries = getPartitioner().splitter().get().splitOwnedRanges(shardCount, weightedRanges, false); shardBoundaries = new ShardBoundaries(boundaries.subList(0, boundaries.size() - 1), - ringVersion); + epoch); cachedShardBoundaries = shardBoundaries; logger.debug("Memtable shard boundaries for {}.{}: {}", keyspace.getName(), getTableName(), boundaries); } @@ -3300,7 +3301,12 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean, Memtable.Owner public DiskBoundaries getDiskBoundaries() { - return diskBoundaryManager.getDiskBoundaries(this); + return diskBoundaryManager.getDiskBoundaries(this, metadata.get()); + } + + public DiskBoundaries getDiskBoundaries(TableMetadata initialMetadata) + { + return diskBoundaryManager.getDiskBoundaries(this, initialMetadata); } public void invalidateLocalRanges() diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java b/src/java/org/apache/cassandra/db/DiskBoundaries.java index 32edcac433..421209d6f3 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaries.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java @@ -27,28 +27,32 @@ import com.google.common.collect.ImmutableList; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.SSTableReader; -import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.Epoch; public class DiskBoundaries { public final List<Directories.DataDirectory> directories; public final ImmutableList<PartitionPosition> positions; - final long ringVersion; + final Epoch epoch; final int directoriesVersion; private final ColumnFamilyStore cfs; private volatile boolean isInvalid = false; public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, int diskVersion) { - this(cfs, directories, null, -1, diskVersion); + this(cfs, directories, null, Epoch.EMPTY, diskVersion); } @VisibleForTesting - public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] directories, List<PartitionPosition> positions, long ringVersion, int diskVersion) + public DiskBoundaries(ColumnFamilyStore cfs, + Directories.DataDirectory[] directories, + List<PartitionPosition> positions, + Epoch epoch, + int diskVersion) { this.directories = directories == null ? null : ImmutableList.copyOf(directories); this.positions = positions == null ? null : ImmutableList.copyOf(positions); - this.ringVersion = ringVersion; + this.epoch = epoch; this.directoriesVersion = diskVersion; this.cfs = cfs; } @@ -60,7 +64,7 @@ public class DiskBoundaries DiskBoundaries that = (DiskBoundaries) o; - if (ringVersion != that.ringVersion) return false; + if (!epoch.equals(that.epoch)) return false; if (directoriesVersion != that.directoriesVersion) return false; if (!directories.equals(that.directories)) return false; return positions != null ? positions.equals(that.positions) : that.positions == null; @@ -70,7 +74,7 @@ public class DiskBoundaries { int result = directories != null ? directories.hashCode() : 0; result = 31 * result + (positions != null ? positions.hashCode() : 0); - result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32)); + result = 31 * result + epoch.hashCode(); result = 31 * result + directoriesVersion; return result; } @@ -80,7 +84,7 @@ public class DiskBoundaries return "DiskBoundaries{" + "directories=" + directories + ", positions=" + positions + - ", ringVersion=" + ringVersion + + ", epoch=" + epoch + ", directoriesVersion=" + directoriesVersion + '}'; } @@ -93,8 +97,7 @@ public class DiskBoundaries if (isInvalid) return true; int currentDiskVersion = DisallowedDirectories.getDirectoriesVersion(); - long currentRingVersion = StorageService.instance.getTokenMetadata().getRingVersion(); - return currentDiskVersion != directoriesVersion || (ringVersion != -1 && currentRingVersion != ringVersion); + return currentDiskVersion != directoriesVersion; } public void invalidate() diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 0de745d3cf..b426d69077 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -31,9 +31,12 @@ import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Splitter; import org.apache.cassandra.dht.Token; import org.apache.cassandra.locator.RangesAtEndpoint; -import org.apache.cassandra.locator.TokenMetadata; -import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.utils.FBUtilities; public class DiskBoundaryManager @@ -43,18 +46,24 @@ public class DiskBoundaryManager public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs) { - if (!cfs.getPartitioner().splitter().isPresent()) + return getDiskBoundaries(cfs, cfs.metadata()); + } + + public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs, TableMetadata metadata) + { + if (!metadata.partitioner.splitter().isPresent()) return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), DisallowedDirectories.getDirectoriesVersion()); + if (diskBoundaries == null || diskBoundaries.isOutOfDate()) { synchronized (this) { if (diskBoundaries == null || diskBoundaries.isOutOfDate()) { - logger.debug("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); + logger.trace("Refreshing disk boundary cache for {}.{}", cfs.keyspace.getName(), cfs.getTableName()); DiskBoundaries oldBoundaries = diskBoundaries; - diskBoundaries = getDiskBoundaryValue(cfs); - logger.debug("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName()); + diskBoundaries = getDiskBoundaryValue(cfs, metadata.partitioner); + logger.trace("Updating boundaries from {} to {} for {}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), cfs.getTableName()); } } } @@ -70,12 +79,12 @@ public class DiskBoundaryManager static class VersionedRangesAtEndpoint { public final RangesAtEndpoint rangesAtEndpoint; - public final long ringVersion; + public final Epoch epoch; - VersionedRangesAtEndpoint(RangesAtEndpoint rangesAtEndpoint, long ringVersion) + VersionedRangesAtEndpoint(RangesAtEndpoint rangesAtEndpoint, Epoch epoch) { this.rangesAtEndpoint = rangesAtEndpoint; - this.ringVersion = ringVersion; + this.epoch = epoch; } } @@ -83,26 +92,35 @@ public class DiskBoundaryManager { RangesAtEndpoint localRanges; - long ringVersion; - TokenMetadata tmd; + Epoch epoch; + ClusterMetadata metadata; do { - tmd = StorageService.instance.getTokenMetadata(); - ringVersion = tmd.getRingVersion(); - localRanges = getLocalRanges(cfs, tmd); - logger.debug("Got local ranges {} (ringVersion = {})", localRanges, ringVersion); + metadata = ClusterMetadata.current(); + epoch = metadata.epoch; + localRanges = getLocalRanges(cfs, metadata); + logger.debug("Got local ranges {} (epoch = {})", localRanges, epoch); } - while (ringVersion != tmd.getRingVersion()); // if ringVersion is different here it means that - // it might have changed before we calculated localRanges - recalculate - - return new VersionedRangesAtEndpoint(localRanges, ringVersion); + while (!metadata.epoch.equals(ClusterMetadata.current().epoch)); // if epoch is different here it means that + // it might have changed before we calculated localRanges - recalculate + return new VersionedRangesAtEndpoint(localRanges, epoch); } - private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs) + private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs, IPartitioner partitioner) { - VersionedRangesAtEndpoint rangesAtEndpoint = getVersionedLocalRanges(cfs); - RangesAtEndpoint localRanges = rangesAtEndpoint.rangesAtEndpoint; - long ringVersion = rangesAtEndpoint.ringVersion; + if (ClusterMetadataService.instance() == null) + return new DiskBoundaries(cfs, cfs.getDirectories().getWriteableLocations(), null, Epoch.EMPTY, DisallowedDirectories.getDirectoriesVersion()); + + RangesAtEndpoint localRanges; + + ClusterMetadata metadata; + do + { + metadata = ClusterMetadata.current(); + localRanges = getLocalRanges(cfs, metadata); + logger.debug("Got local ranges {} (epoch = {})", localRanges, metadata.epoch); + } + while (metadata.epoch != ClusterMetadata.current().epoch); int directoriesVersion; Directories.DataDirectory[] dirs; @@ -114,29 +132,31 @@ public class DiskBoundaryManager while (directoriesVersion != DisallowedDirectories.getDirectoriesVersion()); // if directoriesVersion has changed we need to recalculate if (localRanges == null || localRanges.isEmpty()) - return new DiskBoundaries(cfs, dirs, null, ringVersion, directoriesVersion); + return new DiskBoundaries(cfs, dirs, null, metadata.epoch, directoriesVersion); - List<PartitionPosition> positions = getDiskBoundaries(localRanges, cfs.getPartitioner(), dirs); + List<PartitionPosition> positions = getDiskBoundaries(localRanges, partitioner, dirs); - return new DiskBoundaries(cfs, dirs, positions, ringVersion, directoriesVersion); + return new DiskBoundaries(cfs, dirs, positions, metadata.epoch, directoriesVersion); } - private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, TokenMetadata tmd) + + private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, ClusterMetadata metadata) { RangesAtEndpoint localRanges; + DataPlacement placement; if (StorageService.instance.isBootstrapMode() - && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally + && !StorageService.isReplacingSameAddress()) // When replacing same address, the node marks itself as UN locally { - PendingRangeCalculatorService.instance.blockUntilFinished(); - localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddressAndPort()); + placement = metadata.placements.get(cfs.keyspace.getMetadata().params.replication); } else { - // Reason we use use the future settled TMD is that if we decommission a node, we want to stream + // Reason we use use the future settled metadata is that if we decommission a node, we want to stream // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled - localRanges = cfs.keyspace.getReplicationStrategy().getAddressReplicas(tmd.cloneAfterAllSettled(), FBUtilities.getBroadcastAddressAndPort()); + placement = metadata.writePlacementAllSettled(cfs.keyspace.getMetadata()); } + localRanges = placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()); return localRanges; } diff --git a/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java b/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java index 864899f6a4..5412c2cf62 100644 --- a/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java +++ b/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java @@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.dht.Token; +import org.apache.cassandra.tcm.Epoch; /** * Holds boundaries (tokens) used to map a particular token (so partition key) to a shard id. @@ -43,21 +44,21 @@ public class ShardBoundaries // - there is only 1 shard configured // - the default partitioner doesn't support splitting // - the keyspace is local system keyspace - public static final ShardBoundaries NONE = new ShardBoundaries(EMPTY_TOKEN_ARRAY, -1); + public static final ShardBoundaries NONE = new ShardBoundaries(EMPTY_TOKEN_ARRAY, Epoch.EMPTY); private final Token[] boundaries; - public final long ringVersion; + public final Epoch epoch; @VisibleForTesting - public ShardBoundaries(Token[] boundaries, long ringVersion) + public ShardBoundaries(Token[] boundaries, Epoch epoch) { this.boundaries = boundaries; - this.ringVersion = ringVersion; + this.epoch = epoch; } - public ShardBoundaries(List<Token> boundaries, long ringVersion) + public ShardBoundaries(List<Token> boundaries, Epoch epoch) { - this(boundaries.toArray(EMPTY_TOKEN_ARRAY), ringVersion); + this(boundaries.toArray(EMPTY_TOKEN_ARRAY), epoch); } /** diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 844e87eb53..221b0436ca 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -45,6 +45,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.extensions.ExtensionKey; import org.apache.cassandra.tcm.extensions.ExtensionValue; @@ -54,6 +55,7 @@ import org.apache.cassandra.tcm.membership.NodeAddresses; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.DataPlacement; import org.apache.cassandra.tcm.ownership.DataPlacements; import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator; import org.apache.cassandra.tcm.ownership.PlacementForRange; @@ -146,51 +148,6 @@ public class ClusterMetadata this.fullCMSEndpoints = ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet()); } - public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token) - { - DataPlacements placement = current().placements; - PlacementForRange writes = placement.get(ksm.params.replication).writes; - PlacementForRange reads = placement.get(ksm.params.replication).reads; - return !reads.forToken(token).equals(writes.forToken(token)); - } - - public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endpoint) - { - DataPlacements placement = current().placements; - PlacementForRange writes = placement.get(ksm.params.replication).writes; - PlacementForRange reads = placement.get(ksm.params.replication).reads; - return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint)); - } - - public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata) - { - return placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges(); - } - - public Map<Range<Token>, EndpointsForRange> pendingRanges(KeyspaceMetadata metadata) - { - Map<Range<Token>, EndpointsForRange> map = new HashMap<>(); - List<Range<Token>> pending = new ArrayList<>(placements.get(metadata.params.replication).writes.ranges()); - pending.removeAll(placements.get(metadata.params.replication).reads.ranges()); - for (Range<Token> p : pending) - map.put(p, placements.get(metadata.params.replication).writes.forRange(p)); - return map; - } - - public EndpointsForToken pendingEndpointsFor(KeyspaceMetadata metadata, Token t) - { - EndpointsForToken writeEndpoints = placements.get(metadata.params.replication).writes.forToken(t); - EndpointsForToken readEndpoints = placements.get(metadata.params.replication).reads.forToken(t); - EndpointsForToken.Builder endpointsForToken = writeEndpoints.newBuilder(writeEndpoints.size() - readEndpoints.size()); - - for (Replica writeReplica : writeEndpoints) - { - if (!readEndpoints.contains(writeReplica)) - endpointsForToken.add(writeReplica); - } - return endpointsForToken.build(); - } - public Set<InetAddressAndPort> fullCMSMembers() { return fullCMSEndpoints; @@ -241,6 +198,83 @@ public class ClusterMetadata return lastInPeriod ? period + 1 : period; } + public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm) + { + List<NodeId> leaving = new ArrayList<>(); + List<NodeId> moving = new ArrayList<>(); + + for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet()) + { + switch (entry.getValue()) + { + case LEAVING: + leaving.add(entry.getKey()); + break; + case MOVING: + moving.add(entry.getKey()); + break; + } + } + + Transformer t = transformer(); + for (NodeId node : leaving) + t = t.proposeRemoveNode(node); + // todo: add tests for move! + for (NodeId node : moving) + t = t.proposeRemoveNode(node).proposeToken(node, tokenMap.tokens(node)); + + ClusterMetadata proposed = t.build().metadata; + return ClusterMetadataService.instance() + .placementProvider() + .calculatePlacements(proposed.tokenMap.toRanges(), proposed, Keyspaces.of(ksm)) + .get(ksm.params.replication); + } + + public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token) + { + DataPlacements placement = current().placements; + PlacementForRange writes = placement.get(ksm.params.replication).writes; + PlacementForRange reads = placement.get(ksm.params.replication).reads; + return !reads.forToken(token).equals(writes.forToken(token)); + } + + public boolean hasPendingRangesFor(KeyspaceMetadata ksm, InetAddressAndPort endpoint) + { + DataPlacements placement = current().placements; + PlacementForRange writes = placement.get(ksm.params.replication).writes; + PlacementForRange reads = placement.get(ksm.params.replication).reads; + return !writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint)); + } + + public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata) + { + return placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges(); + } + + public Map<Range<Token>, EndpointsForRange> pendingRanges(KeyspaceMetadata metadata) + { + Map<Range<Token>, EndpointsForRange> map = new HashMap<>(); + List<Range<Token>> pending = new ArrayList<>(placements.get(metadata.params.replication).writes.ranges()); + pending.removeAll(placements.get(metadata.params.replication).reads.ranges()); + for (Range<Token> p : pending) + map.put(p, placements.get(metadata.params.replication).writes.forRange(p)); + return map; + } + + public EndpointsForToken pendingEndpointsFor(KeyspaceMetadata metadata, Token t) + { + EndpointsForToken writeEndpoints = placements.get(metadata.params.replication).writes.forToken(t); + EndpointsForToken readEndpoints = placements.get(metadata.params.replication).reads.forToken(t); + EndpointsForToken.Builder endpointsForToken = writeEndpoints.newBuilder(writeEndpoints.size() - readEndpoints.size()); + + for (Replica writeReplica : writeEndpoints) + { + if (!readEndpoints.contains(writeReplica)) + endpointsForToken.add(writeReplica); + } + return endpointsForToken.build(); + } + public static class Transformer { private final ClusterMetadata base; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
