This is an automated email from the ASF dual-hosted git repository.
crossoverjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fb805c0a [Issue 1259][producer] Prevent panic when calling Flush on
closed producer (#1260)
fb805c0a is described below
commit fb805c0aea3506ba9df87bb066221469e35005a0
Author: Gaylor Bosson <[email protected]>
AuthorDate: Mon Jul 29 08:09:00 2024 +0200
[Issue 1259][producer] Prevent panic when calling Flush on closed producer
(#1260)
---
pulsar/producer_partition.go | 4 ++++
pulsar/producer_test.go | 15 +++++++++++++++
2 files changed, 19 insertions(+)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index cd89862a..5c038aa5 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1494,6 +1494,10 @@ func (p *partitionProducer) Flush() error {
}
func (p *partitionProducer) FlushWithCtx(ctx context.Context) error {
+ if p.getProducerState() != producerReady {
+ return ErrProducerClosed
+ }
+
flushReq := &flushRequest{
doneCh: make(chan struct{}),
err: nil,
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 0e38d637..5b23182d 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -523,6 +523,21 @@ func TestFlushInPartitionedProducer(t *testing.T) {
assert.Equal(t, msgCount, numOfMessages/2)
}
+func TestProducerReturnsErrorOnFlushWhenClosed(t *testing.T) {
+ client, err := NewClient(ClientOptions{URL: serviceURL})
+ assert.NoError(t, err)
+ defer client.Close()
+
+ producer, err := client.CreateProducer(ProducerOptions{Topic:
newTopicName()})
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ producer.Close()
+
+ err = producer.FlushWithCtx(context.Background())
+ assert.Error(t, err)
+}
+
func TestRoundRobinRouterPartitionedProducer(t *testing.T) {
topicName :=
"public/default/partition-testRoundRobinRouterPartitionedProducer"
numberOfPartitions := 5