This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bbb5b26 Fix close blocked (#1308)
7bbb5b26 is described below

commit 7bbb5b268232118d5520c8c5156da1fa6baf9d96
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Dec 9 16:03:44 2024 +0800

    Fix close blocked (#1308)
---
 pulsar/consumer_partition.go |  9 ++++++++-
 pulsar/consumer_test.go      | 43 +++++++++++++++++++++++++++++++++++++++++++
 pulsar/producer_partition.go |  8 +-------
 3 files changed, 52 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 983a6e2c..dd770ce7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -190,6 +190,8 @@ type partitionConsumer struct {
 
        dispatcherSeekingControlCh chan struct{}
        isSeeking                  atomic.Bool
+       ctx                        context.Context
+       cancelFunc                 context.CancelFunc
 }
 
 // pauseDispatchMessage used to discard the message in the dispatcher 
goroutine.
@@ -344,6 +346,7 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                boFunc = backoff.NewDefaultBackoff
        }
 
+       ctx, cancelFunc := context.WithCancel(context.Background())
        pc := &partitionConsumer{
                parentConsumer:             parent,
                client:                     client,
@@ -367,6 +370,8 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                schemaInfoCache:            newSchemaInfoCache(client, 
options.topic),
                backoffPolicyFunc:          boFunc,
                dispatcherSeekingControlCh: make(chan struct{}),
+               ctx:                        ctx,
+               cancelFunc:                 cancelFunc,
        }
        if pc.options.autoReceiverQueueSize {
                pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -938,6 +943,8 @@ func (pc *partitionConsumer) Close() {
                return
        }
 
+       pc.cancelFunc()
+
        // flush all pending ACK requests and terminate the timer goroutine
        pc.ackGroupingTracker.close()
 
@@ -1866,7 +1873,7 @@ func (pc *partitionConsumer) 
reconnectToBroker(connectionClosed *connectionClose
 
                return struct{}{}, err
        }
-       _, _ = internal.Retry(context.Background(), opFn, func(_ error) 
time.Duration {
+       _, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
                delayReconnectTime := bo.Next()
                pc.log.WithFields(log.Fields{
                        "assignedBrokerURL":  assignedBrokerURL,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e3bf9bfb..07b34ee5 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -30,6 +30,10 @@ import (
        "testing"
        "time"
 
+       "github.com/stretchr/testify/require"
+       "github.com/testcontainers/testcontainers-go"
+       "github.com/testcontainers/testcontainers-go/wait"
+
        "github.com/apache/pulsar-client-go/pulsar/backoff"
 
        "github.com/apache/pulsar-client-go/pulsaradmin"
@@ -4940,3 +4944,42 @@ func TestAckResponseNotBlocked(t *testing.T) {
                }
        }
 }
+
+func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
+       req := testcontainers.ContainerRequest{
+               Image:        getPulsarTestImage(),
+               ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+               WaitingFor:   wait.ForExposedPort(),
+               Cmd:          []string{"bin/pulsar", "standalone", "-nfw"},
+       }
+       c, err := testcontainers.GenericContainer(context.Background(), 
testcontainers.GenericContainerRequest{
+               ContainerRequest: req,
+               Started:          true,
+       })
+       require.NoError(t, err, "Failed to start the pulsar container")
+       endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+       require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+       client, err := NewClient(ClientOptions{
+               URL:               endpoint,
+               ConnectionTimeout: 5 * time.Second,
+               OperationTimeout:  5 * time.Second,
+       })
+       require.NoError(t, err)
+       defer client.Close()
+
+       var testConsumer Consumer
+       require.Eventually(t, func() bool {
+               testConsumer, err = client.Subscribe(ConsumerOptions{
+                       Topic:            newTopicName(),
+                       Schema:           NewBytesSchema(nil),
+                       SubscriptionName: "test-sub",
+               })
+               return err == nil
+       }, 30*time.Second, 1*time.Second)
+       _ = c.Terminate(context.Background())
+       require.Eventually(t, func() bool {
+               testConsumer.Close()
+               return true
+       }, 30*time.Second, 1*time.Second)
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c5ae259a..7694c967 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -491,12 +491,6 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
                        return struct{}{}, nil
                }
 
-               select {
-               case <-p.ctx.Done():
-                       return struct{}{}, nil
-               default:
-               }
-
                if p.getProducerState() != producerReady {
                        // Producer is already closing
                        p.log.Info("producer state not ready, exit reconnect")
@@ -552,7 +546,7 @@ func (p *partitionProducer) 
reconnectToBroker(connectionClosed *connectionClosed
 
                return struct{}{}, err
        }
-       _, _ = internal.Retry(context.Background(), opFn, func(_ error) 
time.Duration {
+       _, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
                delayReconnectTime := bo.Next()
                p.log.WithFields(log.Fields{
                        "assignedBrokerURL":  assignedBrokerURL,

Reply via email to