This is an automated email from the ASF dual-hosted git repository.

crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fb805c0a [Issue 1259][producer] Prevent panic when calling Flush on 
closed producer (#1260)
fb805c0a is described below

commit fb805c0aea3506ba9df87bb066221469e35005a0
Author: Gaylor Bosson <[email protected]>
AuthorDate: Mon Jul 29 08:09:00 2024 +0200

    [Issue 1259][producer] Prevent panic when calling Flush on closed producer 
(#1260)
---
 pulsar/producer_partition.go |  4 ++++
 pulsar/producer_test.go      | 15 +++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index cd89862a..5c038aa5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error {
 }
 
 func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
+       if p.getProducerState() != producerReady {
+               return ErrProducerClosed
+       }
+
        flushReq := &flushRequest{
                doneCh: make(chan struct{}),
                err:    nil,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0e38d637..5b23182d 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages/2)
 }
 
+func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) {
+       client, err := NewClient(ClientOptions{URL: serviceURL})
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{Topic: 
newTopicName()})
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       producer.Close()
+
+       err = producer.FlushWithCtx(context.Background())
+       assert.Error(t, err)
+}
+
 func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
        topicName := 
"public/default/partition-testRoundRobinRouterPartitionedProducer"
        numberOfPartitions := 5

Reply via email to