This is an automated email from the ASF dual-hosted git repository.
joao-r-reis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git
The following commit(s) were added to refs/heads/trunk by this push:
new 590aabed Fix repeated "Pool connection error" and reconnections with
small Session.Timeout
590aabed is described below
commit 590aabedfaa33398e308e253e0bda020a8dbcafe
Author: João Reis <[email protected]>
AuthorDate: Tue Jun 2 19:06:44 2026 +0100
Fix repeated "Pool connection error" and reconnections with small
Session.Timeout
In GoCQL v1.x, the read deadline is set to 0 when reading a new response
frame to prevent the connection from closing when it idles.
In v2.0.0 a regression was introduced causing the read deadline to be set
to clusterCfg.Timeout for every read operation which leads to connections
constantly erroring and reconnecting if they idle.
This patch fixes this regression by implementing the behavior of 1.x. A
follow up ticket (CASSGO-127) will rework the timeout configuration so users
can tune the read and write timeouts independently from the request timeout.
Patch by João Reis; reviewed by Bohdan Siryk for CASSGO-125
---
CHANGELOG.md | 1 +
conn.go | 35 +++++++++++++++++++++++++++++------
conn_test.go | 25 ++++++++++++++++++-------
control.go | 3 ++-
integration_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
5 files changed, 101 insertions(+), 14 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 21bfbd2e..cb07d57d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ and this project adheres to [Semantic
Versioning](https://semver.org/spec/v2.0.0
- Prevent panic when using a HostFilter and keyspace is not replicated to
every DC (CASSGO-122)
- system.peers fallback doesn't work in some scenarios (CASSGO-126)
+- Many "Pool connection error" with small Session.Timeout (CASSGO-125)
## [2.1.1]
diff --git a/conn.go b/conn.go
index 9d36cbc3..28b01577 100644
--- a/conn.go
+++ b/conn.go
@@ -171,6 +171,7 @@ type Conn struct {
w contextWriter
writeTimeout time.Duration
+ requestTimeout time.Duration // Request timeout, used for setting up
request timers
cfg *ConnConfig
frameObserver FrameHeaderObserver
streamObserver StreamObserver
@@ -279,6 +280,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context,
host *HostInfo, cfg *
logger: logger,
streamObserver: s.streamObserver,
writeTimeout: writeTimeout,
+ requestTimeout: cfg.ConnectTimeout,
}
if err := c.init(ctx, dialedHost); err != nil {
@@ -312,6 +314,7 @@ func (c *Conn) init(ctx context.Context, dialedHost
*DialedHost) error {
}
c.r.SetTimeout(c.cfg.Timeout)
+ c.requestTimeout = c.cfg.Timeout
// dont coalesce startup frames
if c.session.cfg.WriteCoalesceWaitTime > 0 && !c.cfg.disableCoalesce &&
!dialedHost.DisableCoalesce {
@@ -335,8 +338,8 @@ type startupCoordinator struct {
func (s *startupCoordinator) setupConn(ctx context.Context) error {
var cancel context.CancelFunc
- if s.conn.r.GetTimeout() > 0 {
- ctx, cancel = context.WithTimeout(ctx, s.conn.r.GetTimeout())
+ if s.conn.requestTimeout > 0 {
+ ctx, cancel = context.WithTimeout(ctx, s.conn.requestTimeout)
} else {
ctx, cancel = context.WithCancel(ctx)
}
@@ -688,8 +691,9 @@ func (c *Conn) processFrame(ctx context.Context, r
io.Reader) error {
// read a full header, ignore timeouts, as this is being ran in a loop
// TODO: TCP level deadlines? or just query level deadlines?
- if c.r.GetTimeout() > 0 {
- c.r.SetReadDeadline(time.Time{})
+ readTimeout := c.r.GetTimeout()
+ if readTimeout > 0 {
+ c.r.SetTimeout(0)
}
headStartTime := time.Now()
@@ -700,6 +704,11 @@ func (c *Conn) processFrame(ctx context.Context, r
io.Reader) error {
return err
}
+ // Set timeout back for reading frame body
+ if readTimeout > 0 {
+ c.r.SetTimeout(readTimeout)
+ }
+
if c.frameObserver != nil {
c.frameObserver.ObserveFrameHeader(context.Background(),
ObservedFrameHeader{
Version: protoVersion(head.version),
@@ -802,12 +811,24 @@ func (c *Conn) recvSegment(ctx context.Context) error {
err error
)
+ // Read segment without timeout, as this is being run in a loop waiting
for the next segment
+ readTimeout := c.r.GetTimeout()
+ if readTimeout > 0 {
+ c.r.SetTimeout(0)
+ }
+
// Read frame based on compression
if c.compressor != nil {
frame, isSelfContained, err = readCompressedSegment(c.r,
c.compressor)
} else {
frame, isSelfContained, err = readUncompressedSegment(c.r)
}
+
+ // Restore timeout for subsequent segment reads in multi-segment frames
+ if readTimeout > 0 {
+ c.r.SetTimeout(readTimeout)
+ }
+
if err != nil {
return err
}
@@ -908,6 +929,8 @@ func (c *connReader) Read(p []byte) (n int, err error) {
var nn int
if c.timeout > 0 {
c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+ } else if c.timeout == 0 {
+ c.conn.SetReadDeadline(time.Time{})
}
nn, err = io.ReadFull(c.r, p[n:])
@@ -1309,7 +1332,7 @@ func (c *Conn) execInternal(ctx context.Context, req
frameBuilder, tracer Tracer
}
var timeoutCh <-chan time.Time
- if timeout := c.r.GetTimeout(); timeout > 0 {
+ if c.requestTimeout > 0 {
if call.timer == nil {
call.timer = time.NewTimer(0)
<-call.timer.C
@@ -1322,7 +1345,7 @@ func (c *Conn) execInternal(ctx context.Context, req
frameBuilder, tracer Tracer
}
}
- call.timer.Reset(timeout)
+ call.timer.Reset(c.requestTimeout)
timeoutCh = call.timer.C
}
diff --git a/conn_test.go b/conn_test.go
index ad4e66e5..abc659b5 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -287,7 +287,9 @@ func TestTimeout(t *testing.T) {
srv := NewTestServer(t, defaultProto, ctx)
defer srv.Stop()
- db, err := newTestSession(defaultProto, srv.Address)
+ cluster := testCluster(defaultProto, srv.Address)
+ cluster.Timeout = 2 * time.Second
+ db, err := cluster.CreateSession()
if err != nil {
t.Fatalf("NewCluster: %v", err)
}
@@ -306,12 +308,18 @@ func TestTimeout(t *testing.T) {
}
}()
- if err := db.Query("kill").WithContext(ctx).Exec(); err == nil {
+ now := time.Now()
+ err = db.Query("timeout").ExecContext(ctx)
+ if err == nil {
t.Fatal("expected error got nil")
}
cancel()
-
wg.Wait()
+
+ elapsed := time.Since(now)
+ if elapsed < 1*time.Second || elapsed > 4*time.Second {
+ t.Fatalf("timeout is not respected (took %v)", elapsed.String())
+ }
}
func TestCancel(t *testing.T) {
@@ -329,13 +337,11 @@ func TestCancel(t *testing.T) {
}
defer db.Close()
- qry := db.Query("timeout").WithContext(ctx)
-
// Make sure we finish the query without leftovers
var wg sync.WaitGroup
wg.Add(1)
go func() {
- err = qry.Exec()
+ err = db.Query("timeout").ExecContext(ctx)
wg.Done()
}()
@@ -720,9 +726,14 @@ func TestStream0(t *testing.T) {
t.Fatal(err)
}
+ clientConn, serverConn := net.Pipe()
+ defer clientConn.Close()
+ defer serverConn.Close()
+
conn := &Conn{
r: &connReader{
- r: bufio.NewReader(&buf),
+ r: bufio.NewReader(&buf),
+ conn: clientConn,
},
streams: streams.New(protoVersion4),
session: &Session{
diff --git a/control.go b/control.go
index cc21e089..9f56958a 100644
--- a/control.go
+++ b/control.go
@@ -361,7 +361,8 @@ func (c *controlConn) setupConn(conn *Conn, sessionInit
bool) error {
c.conn.Store(ch)
c.session.logger.Info("Control connection connected to host.",
- NewLogFieldIP("host_addr", host.ConnectAddress()),
NewLogFieldString("host_id", host.HostID()))
+ NewLogFieldIP("host_addr", host.ConnectAddress()),
NewLogFieldString("host_id", host.HostID()),
+ NewLogFieldInt("protocol_version", c.session.cfg.ProtoVersion))
if c.session.initialized() {
refreshErr := c.session.schemaDescriber.refreshSchemaMetadata()
diff --git a/integration_test.go b/integration_test.go
index bd6ccb5c..8bd2e94c 100644
--- a/integration_test.go
+++ b/integration_test.go
@@ -977,3 +977,54 @@ func TestSliceMapMapScanCollectionTypes(t *testing.T) {
})
}
}
+
+// TestSmallTimeoutNoPoolErrors verifies that small Session.Timeout values
+// don't cause connections to timeout and reconnect constantly. This is a
+// regression test for
https://github.com/apache/cassandra-gocql-driver/issues/1919
+//
+// The issue was that the timeout was being applied to frame header reads,
+// causing connections to timeout while waiting for the next frame. The fix
+// ensures frame headers are read without timeout, while frame bodies are
+// read with timeout.
+func TestSmallTimeoutNoPoolErrors(t *testing.T) {
+ // Create a test logger to capture log messages
+ logger := newTestLogger(LogLevelDebug)
+ defer func() {
+ t.Log(logger.String())
+ }()
+
+ cluster := createCluster()
+ cluster.ConnectTimeout = 10 * time.Second
+ cluster.Timeout = 750 * time.Millisecond
+ cluster.NumConns = 1
+ cluster.Logger = logger
+
+ db, err := cluster.CreateSession()
+ if err != nil {
+ t.Fatalf("CreateSession: %v", err)
+ }
+ defer db.Close()
+
+ // Wait for connections to sit idle
+ // If the bug exists, connections will timeout while waiting for frame
headers
+ // and "Pool connection error" messages will be logged repeatedly
+ time.Sleep(5 * time.Second)
+
+ // Get log output for analysis
+ logOutput := strings.ToLower(logger.String())
+
+ // Count successful connection messages - should be exactly NumConns *
number of nodes
+ connectedCount := strings.Count(logOutput, "pool connected to node")
+ if connectedCount != *clusterSize*cluster.NumConns {
+ t.Fatalf("Expected exactly %d 'Pool connected to node'
messages, got %d:\n%s",
+ *clusterSize*cluster.NumConns, connectedCount,
logOutput)
+ }
+
+ // Count error messages - should be zero
+ // With the bug, we'd see many errors as connections constantly timeout
and reconnect
+ errorCount := strings.Count(logOutput, "pool connection error")
+ if errorCount > 0 {
+ t.Fatalf("Found %d 'Pool connection error' messages -
connections are timing out and reconnecting:\n%s",
+ errorCount, logOutput)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]