This is an automated email from the ASF dual-hosted git repository.
joao-r-reis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new 40db71c9 Improve host_source locking and ring refresh concurrency
40db71c9 is described below
commit 40db71c980ef56bed79636c7db03520b7dee5008
Author: Dorian Jaminais-Grellier <[email protected]>
AuthorDate: Thu Jun 18 15:55:21 2026 +0200
Improve host_source locking and ring refresh concurrency
Remove ringDescriber mutex around GetHosts I/O; prevHosts/prevPartitioner
were never written. Use read locks for ConnectAddressAndPort and a
read-fast path for HostnameAndPort. Read peer validity under a single
RLock in isValidPeer. Release errorBroadcaster mutex before sending to
listeners.
remove useless lock in isValidPeer
Patch by Dorian Jaminais; reviewed by João Reis and Bohdan Siryk for
CASSGO-121
---
CHANGELOG.md | 1 +
host_source.go | 34 ++++++++++++++++++----------------
2 files changed, 19 insertions(+), 16 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 580094c6..3f132b5b 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -10,6 +10,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
### Added
- Security-model discoverability (CASSANDRA-21464)
+- Improve host_source locking and ring refresh concurrency (CASSGO-121)
## [2.1.2]
diff --git a/host_source.go b/host_source.go
index e434b652..a0e860f2 100644
--- a/host_source.go
+++ b/host_source.go
@@ -453,6 +453,14 @@ func (h *HostInfo) IsUp() bool {
}
func (h *HostInfo) HostnameAndPort() string {
+ h.mu.RLock()
+ if h.hostname != "" {
+ s := net.JoinHostPort(h.hostname, strconv.Itoa(h.port))
+ h.mu.RUnlock()
+ return s
+ }
+ h.mu.RUnlock()
+
h.mu.Lock()
defer h.mu.Unlock()
if h.hostname == "" {
@@ -463,8 +471,8 @@ func (h *HostInfo) HostnameAndPort() string {
}
func (h *HostInfo) ConnectAddressAndPort() string {
- h.mu.Lock()
- defer h.mu.Unlock()
+ h.mu.RLock()
+ defer h.mu.RUnlock()
addr, _ := h.connectAddressLocked()
return net.JoinHostPort(addr.String(), strconv.Itoa(h.port))
}
@@ -484,10 +492,7 @@ func (h *HostInfo) String() string {
// Polls system.peers at a specific interval to find new hosts
type ringDescriber struct {
- session *Session
- mu sync.Mutex
- prevHosts []*HostInfo
- prevPartitioner string
+ session *Session
}
// Returns true if we are using system_schema.keyspaces instead of
system.schema_keyspaces
@@ -806,7 +811,7 @@ func (r *ringDescriber) getClusterPeerInfo(localHost
*HostInfo) ([]*HostInfo, er
// Return true if the host is a valid peer
func isValidPeer(host *HostInfo) bool {
- return !(len(host.RPCAddress()) == 0 ||
+ return !(len(host.rpcAddress) == 0 ||
host.hostId == "" ||
host.dataCenter == "" ||
host.missingRack ||
@@ -815,17 +820,14 @@ func isValidPeer(host *HostInfo) bool {
// GetHosts returns a list of hosts found via queries to system.local and
system.peers
func (r *ringDescriber) GetHosts() ([]*HostInfo, string, error) {
- r.mu.Lock()
- defer r.mu.Unlock()
-
localHost, err := r.getLocalHostInfo()
if err != nil {
- return r.prevHosts, r.prevPartitioner, err
+ return nil, "", err
}
peerHosts, err := r.getClusterPeerInfo(localHost)
if err != nil {
- return r.prevHosts, r.prevPartitioner, err
+ return nil, "", err
}
hosts := append([]*HostInfo{localHost}, peerHosts...)
@@ -1048,13 +1050,13 @@ func (b *errorBroadcaster) newListener() <-chan error {
func (b *errorBroadcaster) broadcast(err error) {
b.mu.Lock()
- defer b.mu.Unlock()
curListeners := b.listeners
- if len(curListeners) > 0 {
- b.listeners = nil
- } else {
+ if len(curListeners) == 0 {
+ b.mu.Unlock()
return
}
+ b.listeners = nil
+ b.mu.Unlock()
for _, listener := range curListeners {
listener <- err
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]