This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch branch-0.11.0
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 5dc2a4909e737ab8d0d92ddb26b4110cfbc70c2e
Author: Jiaqi Shen <[email protected]>
AuthorDate: Tue Jul 25 18:39:51 2023 +0800

    [fix] [issue 1064] Fix the panic when try to flush in DisableBatching=true 
(#1065)
    
    Fixes #1064
    
    ### Motivation
    
    If we set producer `DisableBatching=true`, it will be panic when call 
`producer.Flush()`. More details in #1064 .
    
    ### Modifications
    
    - Avoid panic in non-batching producer
    - Add unit test to cover `Flush()` in non-batching producer.
    
    (cherry picked from commit 4bfd4aae74efa83fd43400371731258c0afb5d74)
---
 pulsar/producer_partition.go |  4 +++-
 pulsar/producer_test.go      | 44 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 47 insertions(+), 1 deletion(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index bc379afb..ca813762 100755
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -1058,7 +1058,9 @@ func (p *partitionProducer) internalFlush(fr 
*flushRequest) {
                }
        }
 
-       p.internalFlushCurrentBatch()
+       if !p.options.DisableBatching {
+               p.internalFlushCurrentBatch()
+       }
 
        pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
        if !ok {
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 1c7bfd8f..38fec576 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -180,6 +180,50 @@ func TestProducerAsyncSend(t *testing.T) {
        wg.Wait()
 }
 
+func TestProducerFlushDisableBatching(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           newTopicName(),
+               DisableBatching: true,
+       })
+
+       assert.NoError(t, err)
+       assert.NotNil(t, producer)
+       defer producer.Close()
+
+       wg := sync.WaitGroup{}
+       wg.Add(10)
+       errors := internal.NewBlockingQueue(10)
+
+       for i := 0; i < 10; i++ {
+               producer.SendAsync(context.Background(), &ProducerMessage{
+                       Payload: []byte("hello"),
+               }, func(id MessageID, message *ProducerMessage, e error) {
+                       if e != nil {
+                               log.WithError(e).Error("Failed to publish")
+                               errors.Put(e)
+                       } else {
+                               log.Info("Published message ", id)
+                       }
+                       wg.Done()
+               })
+
+               assert.NoError(t, err)
+       }
+
+       err = producer.Flush()
+       assert.Nil(t, err)
+
+       wg.Wait()
+
+       assert.Equal(t, 0, errors.Size())
+}
+
 func TestProducerCompression(t *testing.T) {
        type testProvider struct {
                name            string

Reply via email to