This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.13.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 53fc9381499a0f8d88c97b10ad4b4670539cbc4d Author: Gaylor Bosson <[email protected]> AuthorDate: Mon Jul 29 08:09:00 2024 +0200 [Issue 1259][producer] Prevent panic when calling Flush on closed producer (#1260) (cherry picked from commit fb805c0aea3506ba9df87bb066221469e35005a0) --- pulsar/producer_partition.go | 4 ++++ pulsar/producer_test.go | 15 +++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index cd89862a..5c038aa5 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error { } func (p *partitionProducer) FlushWithCtx(ctx context.Context) error { + if p.getProducerState() != producerReady { + return ErrProducerClosed + } + flushReq := &flushRequest{ doneCh: make(chan struct{}), err: nil, diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 0e38d637..5b23182d 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) { assert.Equal(t, msgCount, numOfMessages/2) } +func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) { + client, err := NewClient(ClientOptions{URL: serviceURL}) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{Topic: newTopicName()}) + assert.NoError(t, err) + assert.NotNil(t, producer) + + producer.Close() + + err = producer.FlushWithCtx(context.Background()) + assert.Error(t, err) +} + func TestRoundRobinRouterPartitionedProducer(t *testing.T) { topicName := "public/default/partition-testRoundRobinRouterPartitionedProducer" numberOfPartitions := 5
