This is an automated email from the ASF dual-hosted git repository. zike pushed a commit to branch branch-0.11.0 in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 5dc2a4909e737ab8d0d92ddb26b4110cfbc70c2e Author: Jiaqi Shen <[email protected]> AuthorDate: Tue Jul 25 18:39:51 2023 +0800 [fix] [issue 1064] Fix the panic when try to flush in DisableBatching=true (#1065) Fixes #1064 ### Motivation If we set producer `DisableBatching=true`, it will be panic when call `producer.Flush()`. More details in #1064 . ### Modifications - Avoid panic in non-batching producer - Add unit test to cover `Flush()` in non-batching producer. (cherry picked from commit 4bfd4aae74efa83fd43400371731258c0afb5d74) --- pulsar/producer_partition.go | 4 +++- pulsar/producer_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index bc379afb..ca813762 100755 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -1058,7 +1058,9 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) { } } - p.internalFlushCurrentBatch() + if !p.options.DisableBatching { + p.internalFlushCurrentBatch() + } pi, ok := p.pendingQueue.PeekLast().(*pendingItem) if !ok { diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go index 1c7bfd8f..38fec576 100644 --- a/pulsar/producer_test.go +++ b/pulsar/producer_test.go @@ -180,6 +180,50 @@ func TestProducerAsyncSend(t *testing.T) { wg.Wait() } +func TestProducerFlushDisableBatching(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.NoError(t, err) + defer client.Close() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: newTopicName(), + DisableBatching: true, + }) + + assert.NoError(t, err) + assert.NotNil(t, producer) + defer producer.Close() + + wg := sync.WaitGroup{} + wg.Add(10) + errors := internal.NewBlockingQueue(10) + + for i := 0; i < 10; i++ { + producer.SendAsync(context.Background(), &ProducerMessage{ + Payload: []byte("hello"), + }, func(id MessageID, message *ProducerMessage, e error) { + if e != nil { + log.WithError(e).Error("Failed to publish") + errors.Put(e) + } else { + log.Info("Published message ", id) + } + wg.Done() + }) + + assert.NoError(t, err) + } + + err = producer.Flush() + assert.Nil(t, err) + + wg.Wait() + + assert.Equal(t, 0, errors.Size()) +} + func TestProducerCompression(t *testing.T) { type testProvider struct { name string
