This is an automated email from the ASF dual-hosted git repository.
blerer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new f661824 Batch the token metadata update to improve the speed
f661824 is described below
commit f66182415b5a778f1420777fd87a345b8942bc3f
Author: Jay Zhuang <[email protected]>
AuthorDate: Thu Jul 29 15:13:19 2021 +0200
Batch the token metadata update to improve the speed
patch by Jay Zhuang; reviewed by Benjamin Lerer and Brandon Williams for
CASSANDRA-15291
---
CHANGES.txt | 1 +
.../apache/cassandra/locator/TokenMetadata.java | 55 +++++++++++++++-------
.../apache/cassandra/service/StorageService.java | 9 ++--
3 files changed, 45 insertions(+), 20 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index 10f39a6..c41bfac 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.1
+ * Batch the token metadata update to improve the speed (CASSANDRA-15291)
* Reduce the log level on "expected" repair exceptions (CASSANDRA-16775)
* Make JMXTimer expose attributes using consistent time unit (CASSANDRA-16760)
* Remove check on gossip status from DynamicEndpointSnitch::updateScores
(CASSANDRA-11671)
diff --git a/src/java/org/apache/cassandra/locator/TokenMetadata.java
b/src/java/org/apache/cassandra/locator/TokenMetadata.java
index f2bbb9f..1e7a22f 100644
--- a/src/java/org/apache/cassandra/locator/TokenMetadata.java
+++ b/src/java/org/apache/cassandra/locator/TokenMetadata.java
@@ -182,8 +182,7 @@ public class TokenMetadata
public void updateNormalTokens(Collection<Token> tokens,
InetAddressAndPort endpoint)
{
Multimap<InetAddressAndPort, Token> endpointTokens =
HashMultimap.create();
- for (Token token : tokens)
- endpointTokens.put(endpoint, token);
+ endpointTokens.putAll(endpoint, tokens);
updateNormalTokens(endpointTokens);
}
@@ -250,23 +249,24 @@ public class TokenMetadata
lock.writeLock().lock();
try
{
- InetAddressAndPort storedEp =
endpointToHostIdMap.inverse().get(hostId);
- if (storedEp != null)
- {
- if (!storedEp.equals(endpoint) &&
(FailureDetector.instance.isAlive(storedEp)))
- {
- throw new RuntimeException(String.format("Host ID
collision between active endpoint %s and %s (id=%s)",
- storedEp,
- endpoint,
- hostId));
- }
- }
+ updateEndpointToHostIdMap(hostId, endpoint);
+ }
+ finally
+ {
+ lock.writeLock().unlock();
+ }
- UUID storedId = endpointToHostIdMap.get(endpoint);
- if ((storedId != null) && (!storedId.equals(hostId)))
- logger.warn("Changing {}'s host ID from {} to {}", endpoint,
storedId, hostId);
+ }
- endpointToHostIdMap.forcePut(endpoint, hostId);
+ public void updateHostIds(Map<UUID, InetAddressAndPort>
hostIdToEndpointMap)
+ {
+ lock.writeLock().lock();
+ try
+ {
+ for (Map.Entry<UUID, InetAddressAndPort> entry :
hostIdToEndpointMap.entrySet())
+ {
+ updateEndpointToHostIdMap(entry.getKey(), entry.getValue());
+ }
}
finally
{
@@ -274,6 +274,27 @@ public class TokenMetadata
}
}
+
+ private void updateEndpointToHostIdMap(UUID hostId, InetAddressAndPort
endpoint)
+ {
+ InetAddressAndPort storedEp =
endpointToHostIdMap.inverse().get(hostId);
+ if (storedEp != null)
+ {
+ if (!storedEp.equals(endpoint) &&
(FailureDetector.instance.isAlive(storedEp)))
+ {
+ throw new RuntimeException(String.format("Host ID collision
between active endpoint %s and %s (id=%s)",
+ storedEp,
+ endpoint,
+ hostId));
+ }
+ }
+
+ UUID storedId = endpointToHostIdMap.get(endpoint);
+ if ((storedId != null) && (!storedId.equals(hostId)))
+ logger.warn("Changing {}'s host ID from {} to {}", endpoint,
storedId, hostId);
+
+ endpointToHostIdMap.forcePut(endpoint, hostId);
+ }
/** Return the unique host ID for an end-point. */
public UUID getHostId(InetAddressAndPort endpoint)
diff --git a/src/java/org/apache/cassandra/service/StorageService.java
b/src/java/org/apache/cassandra/service/StorageService.java
index 14c54e0..f4978f9 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -841,12 +841,15 @@ public class StorageService extends
NotificationBroadcasterSupport implements IE
SystemKeyspace.removeEndpoint(FBUtilities.getBroadcastAddressAndPort());
Map<InetAddressAndPort, UUID> loadedHostIds =
SystemKeyspace.loadHostIds();
+ Map<UUID, InetAddressAndPort> hostIdToEndpointMap = new HashMap<>();
for (InetAddressAndPort ep : loadedTokens.keySet())
{
- tokenMetadata.updateNormalTokens(loadedTokens.get(ep), ep);
- if (loadedHostIds.containsKey(ep))
- tokenMetadata.updateHostId(loadedHostIds.get(ep), ep);
+ UUID hostId = loadedHostIds.get(ep);
+ if (hostId != null)
+ hostIdToEndpointMap.put(hostId, ep);
}
+ tokenMetadata.updateNormalTokens(loadedTokens);
+ tokenMetadata.updateHostIds(hostIdToEndpointMap);
}
private boolean isReplacing()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]