This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.13.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 53fc9381499a0f8d88c97b10ad4b4670539cbc4d
Author: Gaylor Bosson <[email protected]>
AuthorDate: Mon Jul 29 08:09:00 2024 +0200

    [Issue 1259][producer] Prevent panic when calling Flush on closed producer 
(#1260)
    
    (cherry picked from commit fb805c0aea3506ba9df87bb066221469e35005a0)
---
 pulsar/producer_partition.go |  4 ++++
 pulsar/producer_test.go      | 15 +++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index cd89862a..5c038aa5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error {
 }
 
 func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
+       if p.getProducerState() != producerReady {
+               return ErrProducerClosed
+       }
+
        flushReq := &flushRequest{
                doneCh: make(chan struct{}),
                err:    nil,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0e38d637..5b23182d 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) {
        assert.Equal(t, msgCount, numOfMessages/2)
 }
 
+func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) {
+       client, err := NewClient(ClientOptions{URL: serviceURL})
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{Topic: 
newTopicName()})
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+
+       producer.Close()
+
+       err = producer.FlushWithCtx(context.Background())
+       assert.Error(t, err)
+}
+
 func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
        topicName := 
"public/default/partition-testRoundRobinRouterPartitionedProducer"
        numberOfPartitions := 5

Reply via email to