This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch cep-21-tcm
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 2eac1151bd26b61fa60f0c4fe46a3f034f5acef4
Author: Sam Tunnicliffe <[email protected]>
AuthorDate: Thu Mar 2 19:20:39 2023 +0000

    [CEP-21] Update ColumnFamilyStore (4/7)
    
    Part 4 of 7 modifications to ColumnFamilyStore, mostly related to:
    * ShardBoundaries
    * DiskBoundaries
    
    Co-authored-by: Marcus Eriksson <[email protected]>
    Co-authored-by: Alex Petrov <[email protected]>
    Co-authored-by: Sam Tunnicliffe <[email protected]>
---
 .../org/apache/cassandra/db/ColumnFamilyStore.java |  28 +++--
 .../org/apache/cassandra/db/DiskBoundaries.java    |  23 ++--
 .../apache/cassandra/db/DiskBoundaryManager.java   |  84 ++++++++------
 .../cassandra/db/memtable/ShardBoundaries.java     |  13 ++-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  | 124 +++++++++++++--------
 5 files changed, 168 insertions(+), 104 deletions(-)

diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java 
b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
index 0affba5814..99e0bcde68 100644
--- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
+++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java
@@ -90,14 +90,14 @@ import 
org.apache.cassandra.db.compaction.CompactionStrategyManager;
 import org.apache.cassandra.db.compaction.OperationType;
 import org.apache.cassandra.db.filter.ClusteringIndexFilter;
 import org.apache.cassandra.db.filter.DataLimits;
+import org.apache.cassandra.db.memtable.Flushing;
+import org.apache.cassandra.db.memtable.Memtable;
+import org.apache.cassandra.db.memtable.ShardBoundaries;
 import org.apache.cassandra.db.lifecycle.LifecycleNewTracker;
 import org.apache.cassandra.db.lifecycle.LifecycleTransaction;
 import org.apache.cassandra.db.lifecycle.SSTableSet;
 import org.apache.cassandra.db.lifecycle.Tracker;
 import org.apache.cassandra.db.lifecycle.View;
-import org.apache.cassandra.db.memtable.Flushing;
-import org.apache.cassandra.db.memtable.Memtable;
-import org.apache.cassandra.db.memtable.ShardBoundaries;
 import org.apache.cassandra.db.partitions.CachedPartition;
 import org.apache.cassandra.db.partitions.PartitionUpdate;
 import org.apache.cassandra.db.repair.CassandraTableRepairManager;
@@ -162,6 +162,7 @@ import 
org.apache.cassandra.service.snapshot.SnapshotManifest;
 import org.apache.cassandra.service.snapshot.TableSnapshot;
 import org.apache.cassandra.streaming.TableStreamManager;
 import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.utils.ByteBufferUtil;
 import org.apache.cassandra.utils.DefaultValue;
 import org.apache.cassandra.utils.ExecutorUtils;
@@ -520,6 +521,7 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
             data.addInitialSSTablesWithoutUpdatingSize(sstables, this);
         }
 
+        // compaction strategy should be created after the CFS has been 
prepared
         compactionStrategyManager = new CompactionStrategyManager(this);
 
         if (maxCompactionThreshold.value() <= 0 || 
minCompactionThreshold.value() <=0)
@@ -1460,17 +1462,16 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
         if (shardBoundaries == null ||
             shardBoundaries.shardCount() != shardCount ||
-            shardBoundaries.ringVersion != -1 && shardBoundaries.ringVersion 
!= StorageService.instance.getTokenMetadata().getRingVersion())
+            !shardBoundaries.epoch.equals(Epoch.EMPTY) && 
!shardBoundaries.epoch.equals(metadata.epoch))
         {
             List<Splitter.WeightedRange> weightedRanges;
-            long ringVersion;
+            Epoch epoch;
             if (!SchemaConstants.isLocalSystemKeyspace(keyspace.getName())
-                && getPartitioner() == metadata.current().partitioner)
+                && getPartitioner() == 
metadata.current().tokenMap.partitioner())
             {
                 DiskBoundaryManager.VersionedRangesAtEndpoint 
versionedLocalRanges = DiskBoundaryManager.getVersionedLocalRanges(this);
                 Set<Range<Token>> localRanges = 
versionedLocalRanges.rangesAtEndpoint.ranges();
-                ringVersion = versionedLocalRanges.ringVersion;
-
+                epoch = versionedLocalRanges.epoch;
 
                 if (!localRanges.isEmpty())
                 {
@@ -1494,12 +1495,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
                 // Local tables need to cover the full token range and don't 
care about ring changes.
                 // We also end up here if the table's partitioner is not the 
database's, which can happen in tests.
                 weightedRanges = fullWeightedRange();
-                ringVersion = -1;
+                epoch = Epoch.EMPTY;
             }
 
             List<Token> boundaries = 
getPartitioner().splitter().get().splitOwnedRanges(shardCount, weightedRanges, 
false);
             shardBoundaries = new ShardBoundaries(boundaries.subList(0, 
boundaries.size() - 1),
-                                                  ringVersion);
+                                                  epoch);
             cachedShardBoundaries = shardBoundaries;
             logger.debug("Memtable shard boundaries for {}.{}: {}", 
keyspace.getName(), getTableName(), boundaries);
         }
@@ -3300,7 +3301,12 @@ public class ColumnFamilyStore implements 
ColumnFamilyStoreMBean, Memtable.Owner
 
     public DiskBoundaries getDiskBoundaries()
     {
-        return diskBoundaryManager.getDiskBoundaries(this);
+        return diskBoundaryManager.getDiskBoundaries(this, metadata.get());
+    }
+
+    public DiskBoundaries getDiskBoundaries(TableMetadata initialMetadata)
+    {
+        return diskBoundaryManager.getDiskBoundaries(this, initialMetadata);
     }
 
     public void invalidateLocalRanges()
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaries.java 
b/src/java/org/apache/cassandra/db/DiskBoundaries.java
index 32edcac433..421209d6f3 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaries.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaries.java
@@ -27,28 +27,32 @@ import com.google.common.collect.ImmutableList;
 
 import org.apache.cassandra.io.sstable.Descriptor;
 import org.apache.cassandra.io.sstable.format.SSTableReader;
-import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.Epoch;
 
 public class DiskBoundaries
 {
     public final List<Directories.DataDirectory> directories;
     public final ImmutableList<PartitionPosition> positions;
-    final long ringVersion;
+    final Epoch epoch;
     final int directoriesVersion;
     private final ColumnFamilyStore cfs;
     private volatile boolean isInvalid = false;
 
     public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] 
directories, int diskVersion)
     {
-        this(cfs, directories, null, -1, diskVersion);
+        this(cfs, directories, null, Epoch.EMPTY, diskVersion);
     }
 
     @VisibleForTesting
-    public DiskBoundaries(ColumnFamilyStore cfs, Directories.DataDirectory[] 
directories, List<PartitionPosition> positions, long ringVersion, int 
diskVersion)
+    public DiskBoundaries(ColumnFamilyStore cfs,
+                          Directories.DataDirectory[] directories,
+                          List<PartitionPosition> positions,
+                          Epoch epoch,
+                          int diskVersion)
     {
         this.directories = directories == null ? null : 
ImmutableList.copyOf(directories);
         this.positions = positions == null ? null : 
ImmutableList.copyOf(positions);
-        this.ringVersion = ringVersion;
+        this.epoch = epoch;
         this.directoriesVersion = diskVersion;
         this.cfs = cfs;
     }
@@ -60,7 +64,7 @@ public class DiskBoundaries
 
         DiskBoundaries that = (DiskBoundaries) o;
 
-        if (ringVersion != that.ringVersion) return false;
+        if (!epoch.equals(that.epoch)) return false;
         if (directoriesVersion != that.directoriesVersion) return false;
         if (!directories.equals(that.directories)) return false;
         return positions != null ? positions.equals(that.positions) : 
that.positions == null;
@@ -70,7 +74,7 @@ public class DiskBoundaries
     {
         int result = directories != null ? directories.hashCode() : 0;
         result = 31 * result + (positions != null ? positions.hashCode() : 0);
-        result = 31 * result + (int) (ringVersion ^ (ringVersion >>> 32));
+        result = 31 * result + epoch.hashCode();
         result = 31 * result + directoriesVersion;
         return result;
     }
@@ -80,7 +84,7 @@ public class DiskBoundaries
         return "DiskBoundaries{" +
                "directories=" + directories +
                ", positions=" + positions +
-               ", ringVersion=" + ringVersion +
+               ", epoch=" + epoch +
                ", directoriesVersion=" + directoriesVersion +
                '}';
     }
@@ -93,8 +97,7 @@ public class DiskBoundaries
         if (isInvalid)
             return true;
         int currentDiskVersion = DisallowedDirectories.getDirectoriesVersion();
-        long currentRingVersion = 
StorageService.instance.getTokenMetadata().getRingVersion();
-        return currentDiskVersion != directoriesVersion || (ringVersion != -1 
&& currentRingVersion != ringVersion);
+        return currentDiskVersion != directoriesVersion;
     }
 
     public void invalidate()
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java 
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 0de745d3cf..b426d69077 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -31,9 +31,12 @@ import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Splitter;
 import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.locator.RangesAtEndpoint;
-import org.apache.cassandra.locator.TokenMetadata;
-import org.apache.cassandra.service.PendingRangeCalculatorService;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.utils.FBUtilities;
 
 public class DiskBoundaryManager
@@ -43,18 +46,24 @@ public class DiskBoundaryManager
 
     public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs)
     {
-        if (!cfs.getPartitioner().splitter().isPresent())
+        return getDiskBoundaries(cfs, cfs.metadata());
+    }
+
+    public DiskBoundaries getDiskBoundaries(ColumnFamilyStore cfs, 
TableMetadata metadata)
+    {
+        if (!metadata.partitioner.splitter().isPresent())
             return new DiskBoundaries(cfs, 
cfs.getDirectories().getWriteableLocations(), 
DisallowedDirectories.getDirectoriesVersion());
+
         if (diskBoundaries == null || diskBoundaries.isOutOfDate())
         {
             synchronized (this)
             {
                 if (diskBoundaries == null || diskBoundaries.isOutOfDate())
                 {
-                    logger.debug("Refreshing disk boundary cache for {}.{}", 
cfs.keyspace.getName(), cfs.getTableName());
+                    logger.trace("Refreshing disk boundary cache for {}.{}", 
cfs.keyspace.getName(), cfs.getTableName());
                     DiskBoundaries oldBoundaries = diskBoundaries;
-                    diskBoundaries = getDiskBoundaryValue(cfs);
-                    logger.debug("Updating boundaries from {} to {} for 
{}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), 
cfs.getTableName());
+                    diskBoundaries = getDiskBoundaryValue(cfs, 
metadata.partitioner);
+                    logger.trace("Updating boundaries from {} to {} for 
{}.{}", oldBoundaries, diskBoundaries, cfs.keyspace.getName(), 
cfs.getTableName());
                 }
             }
         }
@@ -70,12 +79,12 @@ public class DiskBoundaryManager
     static class VersionedRangesAtEndpoint
     {
         public final RangesAtEndpoint rangesAtEndpoint;
-        public final long ringVersion;
+        public final Epoch epoch;
 
-        VersionedRangesAtEndpoint(RangesAtEndpoint rangesAtEndpoint, long 
ringVersion)
+        VersionedRangesAtEndpoint(RangesAtEndpoint rangesAtEndpoint, Epoch 
epoch)
         {
             this.rangesAtEndpoint = rangesAtEndpoint;
-            this.ringVersion = ringVersion;
+            this.epoch = epoch;
         }
     }
 
@@ -83,26 +92,35 @@ public class DiskBoundaryManager
     {
         RangesAtEndpoint localRanges;
 
-        long ringVersion;
-        TokenMetadata tmd;
+        Epoch epoch;
+        ClusterMetadata metadata;
         do
         {
-            tmd = StorageService.instance.getTokenMetadata();
-            ringVersion = tmd.getRingVersion();
-            localRanges = getLocalRanges(cfs, tmd);
-            logger.debug("Got local ranges {} (ringVersion = {})", 
localRanges, ringVersion);
+            metadata = ClusterMetadata.current();
+            epoch = metadata.epoch;
+            localRanges = getLocalRanges(cfs, metadata);
+            logger.debug("Got local ranges {} (epoch = {})", localRanges, 
epoch);
         }
-        while (ringVersion != tmd.getRingVersion()); // if ringVersion is 
different here it means that
-        // it might have changed before we calculated localRanges - recalculate
-
-        return new VersionedRangesAtEndpoint(localRanges, ringVersion);
+        while (!metadata.epoch.equals(ClusterMetadata.current().epoch)); // if 
epoch is different here it means that
+                                                                         // it 
might have changed before we calculated localRanges - recalculate
+        return new VersionedRangesAtEndpoint(localRanges, epoch);
     }
 
-    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs)
+    private static DiskBoundaries getDiskBoundaryValue(ColumnFamilyStore cfs, 
IPartitioner partitioner)
     {
-        VersionedRangesAtEndpoint rangesAtEndpoint = 
getVersionedLocalRanges(cfs);
-        RangesAtEndpoint localRanges = rangesAtEndpoint.rangesAtEndpoint;
-        long ringVersion = rangesAtEndpoint.ringVersion;
+        if (ClusterMetadataService.instance() == null)
+            return new DiskBoundaries(cfs, 
cfs.getDirectories().getWriteableLocations(), null, Epoch.EMPTY, 
DisallowedDirectories.getDirectoriesVersion());
+
+        RangesAtEndpoint localRanges;
+
+        ClusterMetadata metadata;
+        do
+        {
+            metadata = ClusterMetadata.current();
+            localRanges = getLocalRanges(cfs, metadata);
+            logger.debug("Got local ranges {} (epoch = {})", localRanges, 
metadata.epoch);
+        }
+        while (metadata.epoch != ClusterMetadata.current().epoch);
 
         int directoriesVersion;
         Directories.DataDirectory[] dirs;
@@ -114,29 +132,31 @@ public class DiskBoundaryManager
         while (directoriesVersion != 
DisallowedDirectories.getDirectoriesVersion()); // if directoriesVersion has 
changed we need to recalculate
 
         if (localRanges == null || localRanges.isEmpty())
-            return new DiskBoundaries(cfs, dirs, null, ringVersion, 
directoriesVersion);
+            return new DiskBoundaries(cfs, dirs, null, metadata.epoch, 
directoriesVersion);
 
-        List<PartitionPosition> positions = getDiskBoundaries(localRanges, 
cfs.getPartitioner(), dirs);
+        List<PartitionPosition> positions = getDiskBoundaries(localRanges, 
partitioner, dirs);
 
-        return new DiskBoundaries(cfs, dirs, positions, ringVersion, 
directoriesVersion);
+        return new DiskBoundaries(cfs, dirs, positions, metadata.epoch, 
directoriesVersion);
     }
 
-    private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, 
TokenMetadata tmd)
+
+    private static RangesAtEndpoint getLocalRanges(ColumnFamilyStore cfs, 
ClusterMetadata metadata)
     {
         RangesAtEndpoint localRanges;
+        DataPlacement placement;
         if (StorageService.instance.isBootstrapMode()
-        && !StorageService.isReplacingSameAddress()) // When replacing same 
address, the node marks itself as UN locally
+            && !StorageService.isReplacingSameAddress()) // When replacing 
same address, the node marks itself as UN locally
         {
-            PendingRangeCalculatorService.instance.blockUntilFinished();
-            localRanges = tmd.getPendingRanges(cfs.keyspace.getName(), 
FBUtilities.getBroadcastAddressAndPort());
+            placement = 
metadata.placements.get(cfs.keyspace.getMetadata().params.replication);
         }
         else
         {
-            // Reason we use use the future settled TMD is that if we 
decommission a node, we want to stream
+            // Reason we use use the future settled metadata is that if we 
decommission a node, we want to stream
             // from that node to the correct location on disk, if we didn't, 
we would put new files in the wrong places.
             // We do this to minimize the amount of data we need to move in 
rebalancedisks once everything settled
-            localRanges = 
cfs.keyspace.getReplicationStrategy().getAddressReplicas(tmd.cloneAfterAllSettled(),
 FBUtilities.getBroadcastAddressAndPort());
+            placement = 
metadata.writePlacementAllSettled(cfs.keyspace.getMetadata());
         }
+        localRanges = 
placement.writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort());
         return localRanges;
     }
 
diff --git a/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java 
b/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java
index 864899f6a4..5412c2cf62 100644
--- a/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java
+++ b/src/java/org/apache/cassandra/db/memtable/ShardBoundaries.java
@@ -25,6 +25,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.tcm.Epoch;
 
 /**
  * Holds boundaries (tokens) used to map a particular token (so partition key) 
to a shard id.
@@ -43,21 +44,21 @@ public class ShardBoundaries
     // - there is only 1 shard configured
     // - the default partitioner doesn't support splitting
     // - the keyspace is local system keyspace
-    public static final ShardBoundaries NONE = new 
ShardBoundaries(EMPTY_TOKEN_ARRAY, -1);
+    public static final ShardBoundaries NONE = new 
ShardBoundaries(EMPTY_TOKEN_ARRAY, Epoch.EMPTY);
 
     private final Token[] boundaries;
-    public final long ringVersion;
+    public final Epoch epoch;
 
     @VisibleForTesting
-    public ShardBoundaries(Token[] boundaries, long ringVersion)
+    public ShardBoundaries(Token[] boundaries, Epoch epoch)
     {
         this.boundaries = boundaries;
-        this.ringVersion = ringVersion;
+        this.epoch = epoch;
     }
 
-    public ShardBoundaries(List<Token> boundaries, long ringVersion)
+    public ShardBoundaries(List<Token> boundaries, Epoch epoch)
     {
-        this(boundaries.toArray(EMPTY_TOKEN_ARRAY), ringVersion);
+        this(boundaries.toArray(EMPTY_TOKEN_ARRAY), epoch);
     }
 
     /**
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 844e87eb53..221b0436ca 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -45,6 +45,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.tcm.extensions.ExtensionKey;
 import org.apache.cassandra.tcm.extensions.ExtensionValue;
@@ -54,6 +55,7 @@ import org.apache.cassandra.tcm.membership.NodeAddresses;
 import org.apache.cassandra.tcm.membership.NodeId;
 import org.apache.cassandra.tcm.membership.NodeState;
 import org.apache.cassandra.tcm.membership.NodeVersion;
+import org.apache.cassandra.tcm.ownership.DataPlacement;
 import org.apache.cassandra.tcm.ownership.DataPlacements;
 import org.apache.cassandra.tcm.ownership.PrimaryRangeComparator;
 import org.apache.cassandra.tcm.ownership.PlacementForRange;
@@ -146,51 +148,6 @@ public class ClusterMetadata
         this.fullCMSEndpoints = 
ImmutableSet.copyOf(placements.get(ReplicationParams.meta()).reads.byEndpoint().keySet());
     }
 
-    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token)
-    {
-        DataPlacements placement = current().placements;
-        PlacementForRange writes = 
placement.get(ksm.params.replication).writes;
-        PlacementForRange reads = placement.get(ksm.params.replication).reads;
-        return !reads.forToken(token).equals(writes.forToken(token));
-    }
-
-    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, 
InetAddressAndPort endpoint)
-    {
-        DataPlacements placement = current().placements;
-        PlacementForRange writes = 
placement.get(ksm.params.replication).writes;
-        PlacementForRange reads = placement.get(ksm.params.replication).reads;
-        return 
!writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
-    }
-
-    public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
-    {
-        return 
placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges();
-    }
-
-    public Map<Range<Token>, EndpointsForRange> pendingRanges(KeyspaceMetadata 
metadata)
-    {
-        Map<Range<Token>, EndpointsForRange> map = new HashMap<>();
-        List<Range<Token>> pending = new 
ArrayList<>(placements.get(metadata.params.replication).writes.ranges());
-        
pending.removeAll(placements.get(metadata.params.replication).reads.ranges());
-        for (Range<Token> p : pending)
-            map.put(p, 
placements.get(metadata.params.replication).writes.forRange(p));
-        return map;
-    }
-
-    public EndpointsForToken pendingEndpointsFor(KeyspaceMetadata metadata, 
Token t)
-    {
-        EndpointsForToken writeEndpoints = 
placements.get(metadata.params.replication).writes.forToken(t);
-        EndpointsForToken readEndpoints = 
placements.get(metadata.params.replication).reads.forToken(t);
-        EndpointsForToken.Builder endpointsForToken = 
writeEndpoints.newBuilder(writeEndpoints.size() - readEndpoints.size());
-
-        for (Replica writeReplica : writeEndpoints)
-        {
-            if (!readEndpoints.contains(writeReplica))
-                endpointsForToken.add(writeReplica);
-        }
-        return endpointsForToken.build();
-    }
-
     public Set<InetAddressAndPort> fullCMSMembers()
     {
         return fullCMSEndpoints;
@@ -241,6 +198,83 @@ public class ClusterMetadata
         return lastInPeriod ? period + 1 : period;
     }
 
+    public DataPlacement writePlacementAllSettled(KeyspaceMetadata ksm)
+    {
+        List<NodeId> leaving = new ArrayList<>();
+        List<NodeId> moving = new ArrayList<>();
+
+        for (Map.Entry<NodeId, NodeState> entry : directory.states.entrySet())
+        {
+            switch (entry.getValue())
+            {
+                case LEAVING:
+                    leaving.add(entry.getKey());
+                    break;
+                case MOVING:
+                    moving.add(entry.getKey());
+                    break;
+            }
+        }
+
+        Transformer t = transformer();
+        for (NodeId node : leaving)
+            t = t.proposeRemoveNode(node);
+        // todo: add tests for move!
+        for (NodeId node : moving)
+            t = t.proposeRemoveNode(node).proposeToken(node, 
tokenMap.tokens(node));
+
+        ClusterMetadata proposed = t.build().metadata;
+        return ClusterMetadataService.instance()
+                                     .placementProvider()
+                                     
.calculatePlacements(proposed.tokenMap.toRanges(), proposed, Keyspaces.of(ksm))
+                                     .get(ksm.params.replication);
+    }
+
+    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, Token token)
+    {
+        DataPlacements placement = current().placements;
+        PlacementForRange writes = 
placement.get(ksm.params.replication).writes;
+        PlacementForRange reads = placement.get(ksm.params.replication).reads;
+        return !reads.forToken(token).equals(writes.forToken(token));
+    }
+
+    public boolean hasPendingRangesFor(KeyspaceMetadata ksm, 
InetAddressAndPort endpoint)
+    {
+        DataPlacements placement = current().placements;
+        PlacementForRange writes = 
placement.get(ksm.params.replication).writes;
+        PlacementForRange reads = placement.get(ksm.params.replication).reads;
+        return 
!writes.byEndpoint().get(endpoint).equals(reads.byEndpoint().get(endpoint));
+    }
+
+    public Collection<Range<Token>> localWriteRanges(KeyspaceMetadata metadata)
+    {
+        return 
placements.get(metadata.params.replication).writes.byEndpoint().get(FBUtilities.getBroadcastAddressAndPort()).ranges();
+    }
+
+    public Map<Range<Token>, EndpointsForRange> pendingRanges(KeyspaceMetadata 
metadata)
+    {
+        Map<Range<Token>, EndpointsForRange> map = new HashMap<>();
+        List<Range<Token>> pending = new 
ArrayList<>(placements.get(metadata.params.replication).writes.ranges());
+        
pending.removeAll(placements.get(metadata.params.replication).reads.ranges());
+        for (Range<Token> p : pending)
+            map.put(p, 
placements.get(metadata.params.replication).writes.forRange(p));
+        return map;
+    }
+
+    public EndpointsForToken pendingEndpointsFor(KeyspaceMetadata metadata, 
Token t)
+    {
+        EndpointsForToken writeEndpoints = 
placements.get(metadata.params.replication).writes.forToken(t);
+        EndpointsForToken readEndpoints = 
placements.get(metadata.params.replication).reads.forToken(t);
+        EndpointsForToken.Builder endpointsForToken = 
writeEndpoints.newBuilder(writeEndpoints.size() - readEndpoints.size());
+
+        for (Replica writeReplica : writeEndpoints)
+        {
+            if (!readEndpoints.contains(writeReplica))
+                endpointsForToken.add(writeReplica);
+        }
+        return endpointsForToken.build();
+    }
+
     public static class Transformer
     {
         private final ClusterMetadata base;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to