This is an automated email from the ASF dual-hosted git repository.

joao-r-reis pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-gocql-driver.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 590aabed Fix repeated "Pool connection error" and reconnections with 
small Session.Timeout
590aabed is described below

commit 590aabedfaa33398e308e253e0bda020a8dbcafe
Author: João Reis <[email protected]>
AuthorDate: Tue Jun 2 19:06:44 2026 +0100

    Fix repeated "Pool connection error" and reconnections with small 
Session.Timeout
    
    In GoCQL v1.x, the read deadline is set to 0 when reading a new response 
frame to prevent the connection from closing when it idles.
    
    In v2.0.0 a regression was introduced causing the read deadline to be set 
to clusterCfg.Timeout for every read operation which leads to connections 
constantly erroring and reconnecting if they idle.
    
    This patch fixes this regression by implementing the behavior of 1.x. A 
follow up ticket (CASSGO-127) will rework the timeout configuration so users 
can tune the read and write timeouts independently from the request timeout.
    
    Patch by João Reis; reviewed by Bohdan Siryk for CASSGO-125
---
 CHANGELOG.md        |  1 +
 conn.go             | 35 +++++++++++++++++++++++++++++------
 conn_test.go        | 25 ++++++++++++++++++-------
 control.go          |  3 ++-
 integration_test.go | 51 +++++++++++++++++++++++++++++++++++++++++++++++++++
 5 files changed, 101 insertions(+), 14 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 21bfbd2e..cb07d57d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -11,6 +11,7 @@ and this project adheres to [Semantic 
Versioning](https://semver.org/spec/v2.0.0
 
 - Prevent panic when using a HostFilter and keyspace is not replicated to 
every DC (CASSGO-122)
 - system.peers fallback doesn't work in some scenarios (CASSGO-126)
+- Many "Pool connection error" with small Session.Timeout (CASSGO-125)
 
 ## [2.1.1]
 
diff --git a/conn.go b/conn.go
index 9d36cbc3..28b01577 100644
--- a/conn.go
+++ b/conn.go
@@ -171,6 +171,7 @@ type Conn struct {
        w contextWriter
 
        writeTimeout   time.Duration
+       requestTimeout time.Duration // Request timeout, used for setting up 
request timers
        cfg            *ConnConfig
        frameObserver  FrameHeaderObserver
        streamObserver StreamObserver
@@ -279,6 +280,7 @@ func (s *Session) dialWithoutObserver(ctx context.Context, 
host *HostInfo, cfg *
                logger:         logger,
                streamObserver: s.streamObserver,
                writeTimeout:   writeTimeout,
+               requestTimeout: cfg.ConnectTimeout,
        }
 
        if err := c.init(ctx, dialedHost); err != nil {
@@ -312,6 +314,7 @@ func (c *Conn) init(ctx context.Context, dialedHost 
*DialedHost) error {
        }
 
        c.r.SetTimeout(c.cfg.Timeout)
+       c.requestTimeout = c.cfg.Timeout
 
        // dont coalesce startup frames
        if c.session.cfg.WriteCoalesceWaitTime > 0 && !c.cfg.disableCoalesce && 
!dialedHost.DisableCoalesce {
@@ -335,8 +338,8 @@ type startupCoordinator struct {
 
 func (s *startupCoordinator) setupConn(ctx context.Context) error {
        var cancel context.CancelFunc
-       if s.conn.r.GetTimeout() > 0 {
-               ctx, cancel = context.WithTimeout(ctx, s.conn.r.GetTimeout())
+       if s.conn.requestTimeout > 0 {
+               ctx, cancel = context.WithTimeout(ctx, s.conn.requestTimeout)
        } else {
                ctx, cancel = context.WithCancel(ctx)
        }
@@ -688,8 +691,9 @@ func (c *Conn) processFrame(ctx context.Context, r 
io.Reader) error {
 
        // read a full header, ignore timeouts, as this is being ran in a loop
        // TODO: TCP level deadlines? or just query level deadlines?
-       if c.r.GetTimeout() > 0 {
-               c.r.SetReadDeadline(time.Time{})
+       readTimeout := c.r.GetTimeout()
+       if readTimeout > 0 {
+               c.r.SetTimeout(0)
        }
 
        headStartTime := time.Now()
@@ -700,6 +704,11 @@ func (c *Conn) processFrame(ctx context.Context, r 
io.Reader) error {
                return err
        }
 
+       // Set timeout back for reading frame body
+       if readTimeout > 0 {
+               c.r.SetTimeout(readTimeout)
+       }
+
        if c.frameObserver != nil {
                c.frameObserver.ObserveFrameHeader(context.Background(), 
ObservedFrameHeader{
                        Version: protoVersion(head.version),
@@ -802,12 +811,24 @@ func (c *Conn) recvSegment(ctx context.Context) error {
                err             error
        )
 
+       // Read segment without timeout, as this is being run in a loop waiting 
for the next segment
+       readTimeout := c.r.GetTimeout()
+       if readTimeout > 0 {
+               c.r.SetTimeout(0)
+       }
+
        // Read frame based on compression
        if c.compressor != nil {
                frame, isSelfContained, err = readCompressedSegment(c.r, 
c.compressor)
        } else {
                frame, isSelfContained, err = readUncompressedSegment(c.r)
        }
+
+       // Restore timeout for subsequent segment reads in multi-segment frames
+       if readTimeout > 0 {
+               c.r.SetTimeout(readTimeout)
+       }
+
        if err != nil {
                return err
        }
@@ -908,6 +929,8 @@ func (c *connReader) Read(p []byte) (n int, err error) {
                var nn int
                if c.timeout > 0 {
                        c.conn.SetReadDeadline(time.Now().Add(c.timeout))
+               } else if c.timeout == 0 {
+                       c.conn.SetReadDeadline(time.Time{})
                }
 
                nn, err = io.ReadFull(c.r, p[n:])
@@ -1309,7 +1332,7 @@ func (c *Conn) execInternal(ctx context.Context, req 
frameBuilder, tracer Tracer
        }
 
        var timeoutCh <-chan time.Time
-       if timeout := c.r.GetTimeout(); timeout > 0 {
+       if c.requestTimeout > 0 {
                if call.timer == nil {
                        call.timer = time.NewTimer(0)
                        <-call.timer.C
@@ -1322,7 +1345,7 @@ func (c *Conn) execInternal(ctx context.Context, req 
frameBuilder, tracer Tracer
                        }
                }
 
-               call.timer.Reset(timeout)
+               call.timer.Reset(c.requestTimeout)
                timeoutCh = call.timer.C
        }
 
diff --git a/conn_test.go b/conn_test.go
index ad4e66e5..abc659b5 100644
--- a/conn_test.go
+++ b/conn_test.go
@@ -287,7 +287,9 @@ func TestTimeout(t *testing.T) {
        srv := NewTestServer(t, defaultProto, ctx)
        defer srv.Stop()
 
-       db, err := newTestSession(defaultProto, srv.Address)
+       cluster := testCluster(defaultProto, srv.Address)
+       cluster.Timeout = 2 * time.Second
+       db, err := cluster.CreateSession()
        if err != nil {
                t.Fatalf("NewCluster: %v", err)
        }
@@ -306,12 +308,18 @@ func TestTimeout(t *testing.T) {
                }
        }()
 
-       if err := db.Query("kill").WithContext(ctx).Exec(); err == nil {
+       now := time.Now()
+       err = db.Query("timeout").ExecContext(ctx)
+       if err == nil {
                t.Fatal("expected error got nil")
        }
        cancel()
-
        wg.Wait()
+
+       elapsed := time.Since(now)
+       if elapsed < 1*time.Second || elapsed > 4*time.Second {
+               t.Fatalf("timeout is not respected (took %v)", elapsed.String())
+       }
 }
 
 func TestCancel(t *testing.T) {
@@ -329,13 +337,11 @@ func TestCancel(t *testing.T) {
        }
        defer db.Close()
 
-       qry := db.Query("timeout").WithContext(ctx)
-
        // Make sure we finish the query without leftovers
        var wg sync.WaitGroup
        wg.Add(1)
        go func() {
-               err = qry.Exec()
+               err = db.Query("timeout").ExecContext(ctx)
                wg.Done()
        }()
 
@@ -720,9 +726,14 @@ func TestStream0(t *testing.T) {
                t.Fatal(err)
        }
 
+       clientConn, serverConn := net.Pipe()
+       defer clientConn.Close()
+       defer serverConn.Close()
+
        conn := &Conn{
                r: &connReader{
-                       r: bufio.NewReader(&buf),
+                       r:    bufio.NewReader(&buf),
+                       conn: clientConn,
                },
                streams: streams.New(protoVersion4),
                session: &Session{
diff --git a/control.go b/control.go
index cc21e089..9f56958a 100644
--- a/control.go
+++ b/control.go
@@ -361,7 +361,8 @@ func (c *controlConn) setupConn(conn *Conn, sessionInit 
bool) error {
        c.conn.Store(ch)
 
        c.session.logger.Info("Control connection connected to host.",
-               NewLogFieldIP("host_addr", host.ConnectAddress()), 
NewLogFieldString("host_id", host.HostID()))
+               NewLogFieldIP("host_addr", host.ConnectAddress()), 
NewLogFieldString("host_id", host.HostID()),
+               NewLogFieldInt("protocol_version", c.session.cfg.ProtoVersion))
 
        if c.session.initialized() {
                refreshErr := c.session.schemaDescriber.refreshSchemaMetadata()
diff --git a/integration_test.go b/integration_test.go
index bd6ccb5c..8bd2e94c 100644
--- a/integration_test.go
+++ b/integration_test.go
@@ -977,3 +977,54 @@ func TestSliceMapMapScanCollectionTypes(t *testing.T) {
                })
        }
 }
+
+// TestSmallTimeoutNoPoolErrors verifies that small Session.Timeout values
+// don't cause connections to timeout and reconnect constantly. This is a
+// regression test for 
https://github.com/apache/cassandra-gocql-driver/issues/1919
+//
+// The issue was that the timeout was being applied to frame header reads,
+// causing connections to timeout while waiting for the next frame. The fix
+// ensures frame headers are read without timeout, while frame bodies are
+// read with timeout.
+func TestSmallTimeoutNoPoolErrors(t *testing.T) {
+       // Create a test logger to capture log messages
+       logger := newTestLogger(LogLevelDebug)
+       defer func() {
+               t.Log(logger.String())
+       }()
+
+       cluster := createCluster()
+       cluster.ConnectTimeout = 10 * time.Second
+       cluster.Timeout = 750 * time.Millisecond
+       cluster.NumConns = 1
+       cluster.Logger = logger
+
+       db, err := cluster.CreateSession()
+       if err != nil {
+               t.Fatalf("CreateSession: %v", err)
+       }
+       defer db.Close()
+
+       // Wait for connections to sit idle
+       // If the bug exists, connections will timeout while waiting for frame 
headers
+       // and "Pool connection error" messages will be logged repeatedly
+       time.Sleep(5 * time.Second)
+
+       // Get log output for analysis
+       logOutput := strings.ToLower(logger.String())
+
+       // Count successful connection messages - should be exactly NumConns * 
number of nodes
+       connectedCount := strings.Count(logOutput, "pool connected to node")
+       if connectedCount != *clusterSize*cluster.NumConns {
+               t.Fatalf("Expected exactly %d 'Pool connected to node' 
messages, got %d:\n%s",
+                       *clusterSize*cluster.NumConns, connectedCount, 
logOutput)
+       }
+
+       // Count error messages - should be zero
+       // With the bug, we'd see many errors as connections constantly timeout 
and reconnect
+       errorCount := strings.Count(logOutput, "pool connection error")
+       if errorCount > 0 {
+               t.Fatalf("Found %d 'Pool connection error' messages - 
connections are timing out and reconnecting:\n%s",
+                       errorCount, logOutput)
+       }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to