This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 7bbb5b26 Fix close blocked (#1308)
7bbb5b26 is described below
commit 7bbb5b268232118d5520c8c5156da1fa6baf9d96
Author: Zixuan Liu <[email protected]>
AuthorDate: Mon Dec 9 16:03:44 2024 +0800
Fix close blocked (#1308)
---
pulsar/consumer_partition.go | 9 ++++++++-
pulsar/consumer_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
pulsar/producer_partition.go | 8 +-------
3 files changed, 52 insertions(+), 8 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 983a6e2c..dd770ce7 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -190,6 +190,8 @@ type partitionConsumer struct {
dispatcherSeekingControlCh chan struct{}
isSeeking atomic.Bool
+ ctx context.Context
+ cancelFunc context.CancelFunc
}
// pauseDispatchMessage used to discard the message in the dispatcher
goroutine.
@@ -344,6 +346,7 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
boFunc = backoff.NewDefaultBackoff
}
+ ctx, cancelFunc := context.WithCancel(context.Background())
pc := &partitionConsumer{
parentConsumer: parent,
client: client,
@@ -367,6 +370,8 @@ func newPartitionConsumer(parent Consumer, client *client,
options *partitionCon
schemaInfoCache: newSchemaInfoCache(client,
options.topic),
backoffPolicyFunc: boFunc,
dispatcherSeekingControlCh: make(chan struct{}),
+ ctx: ctx,
+ cancelFunc: cancelFunc,
}
if pc.options.autoReceiverQueueSize {
pc.currentQueueSize.Store(initialReceiverQueueSize)
@@ -938,6 +943,8 @@ func (pc *partitionConsumer) Close() {
return
}
+ pc.cancelFunc()
+
// flush all pending ACK requests and terminate the timer goroutine
pc.ackGroupingTracker.close()
@@ -1866,7 +1873,7 @@ func (pc *partitionConsumer)
reconnectToBroker(connectionClosed *connectionClose
return struct{}{}, err
}
- _, _ = internal.Retry(context.Background(), opFn, func(_ error)
time.Duration {
+ _, _ = internal.Retry(pc.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
pc.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e3bf9bfb..07b34ee5 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -30,6 +30,10 @@ import (
"testing"
"time"
+ "github.com/stretchr/testify/require"
+ "github.com/testcontainers/testcontainers-go"
+ "github.com/testcontainers/testcontainers-go/wait"
+
"github.com/apache/pulsar-client-go/pulsar/backoff"
"github.com/apache/pulsar-client-go/pulsaradmin"
@@ -4940,3 +4944,42 @@ func TestAckResponseNotBlocked(t *testing.T) {
}
}
}
+
+func TestConsumerKeepReconnectingAndThenCallClose(t *testing.T) {
+ req := testcontainers.ContainerRequest{
+ Image: getPulsarTestImage(),
+ ExposedPorts: []string{"6650/tcp", "8080/tcp"},
+ WaitingFor: wait.ForExposedPort(),
+ Cmd: []string{"bin/pulsar", "standalone", "-nfw"},
+ }
+ c, err := testcontainers.GenericContainer(context.Background(),
testcontainers.GenericContainerRequest{
+ ContainerRequest: req,
+ Started: true,
+ })
+ require.NoError(t, err, "Failed to start the pulsar container")
+ endpoint, err := c.PortEndpoint(context.Background(), "6650", "pulsar")
+ require.NoError(t, err, "Failed to get the pulsar endpoint")
+
+ client, err := NewClient(ClientOptions{
+ URL: endpoint,
+ ConnectionTimeout: 5 * time.Second,
+ OperationTimeout: 5 * time.Second,
+ })
+ require.NoError(t, err)
+ defer client.Close()
+
+ var testConsumer Consumer
+ require.Eventually(t, func() bool {
+ testConsumer, err = client.Subscribe(ConsumerOptions{
+ Topic: newTopicName(),
+ Schema: NewBytesSchema(nil),
+ SubscriptionName: "test-sub",
+ })
+ return err == nil
+ }, 30*time.Second, 1*time.Second)
+ _ = c.Terminate(context.Background())
+ require.Eventually(t, func() bool {
+ testConsumer.Close()
+ return true
+ }, 30*time.Second, 1*time.Second)
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c5ae259a..7694c967 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -491,12 +491,6 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
return struct{}{}, nil
}
- select {
- case <-p.ctx.Done():
- return struct{}{}, nil
- default:
- }
-
if p.getProducerState() != producerReady {
// Producer is already closing
p.log.Info("producer state not ready, exit reconnect")
@@ -552,7 +546,7 @@ func (p *partitionProducer)
reconnectToBroker(connectionClosed *connectionClosed
return struct{}{}, err
}
- _, _ = internal.Retry(context.Background(), opFn, func(_ error)
time.Duration {
+ _, _ = internal.Retry(p.ctx, opFn, func(_ error) time.Duration {
delayReconnectTime := bo.Next()
p.log.WithFields(log.Fields{
"assignedBrokerURL": assignedBrokerURL,