This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 85bbf45  [Issue #123] Ensure message is sent if no room in current 
batch. (#124)
85bbf45 is described below

commit 85bbf458b960be81d236bd58d424f189d8934d71
Author: cckellogg <cckell...@gmail.com>
AuthorDate: Mon Dec 16 20:35:53 2019 -0800

    [Issue #123] Ensure message is sent if no room in current batch. (#124)
    
    * [Issue #123] Ensure message is sent if no room in current batch.
    
    * Add issue to test comment.
    
    * Increase test wait timeout.
    
    * Add timeout log for test.
    
    * Add log when a single message fails to be added to batch.
---
 pulsar/consumer_test.go              |  3 +-
 pulsar/negative_acks_tracker_test.go |  8 ++---
 pulsar/producer_partition.go         | 19 +++++++++--
 pulsar/producer_test.go              | 63 ++++++++++++++++++++++++++++++++++++
 4 files changed, 85 insertions(+), 8 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 5ff734c..c2a2073 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -787,7 +787,8 @@ func TestConsumerMetadata(t *testing.T) {
                t.Fatal(err)
        }
        subs := stats["subscriptions"].(map[string]interface{})
-       meta := 
subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})["metadata"].(map[string]interface{})
+       cons := 
subs["my-sub"].(map[string]interface{})["consumers"].([]interface{})[0].(map[string]interface{})
+       meta := cons["metadata"].(map[string]interface{})
        assert.Equal(t, len(props), len(meta))
        for k, v := range props {
                mv := meta[k].(string)
diff --git a/pulsar/negative_acks_tracker_test.go 
b/pulsar/negative_acks_tracker_test.go
index 4dac66e..93c1af2 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -29,9 +29,9 @@ import (
 const testNackDelay = 300 * time.Millisecond
 
 type nackMockedConsumer struct {
-       ch chan messageID
+       ch     chan messageID
        closed bool
-       lock sync.Mutex
+       lock   sync.Mutex
        msgIds []messageID
 }
 
@@ -42,7 +42,7 @@ func newNackMockedConsumer() *nackMockedConsumer {
        go func() {
                // since the client ticks at an interval of delay / 3
                // wait another interval to ensure we get all messages
-               time.Sleep(testNackDelay + 101 * time.Millisecond)
+               time.Sleep(testNackDelay + 101*time.Millisecond)
                t.lock.Lock()
                defer t.lock.Unlock()
                t.closed = true
@@ -69,7 +69,7 @@ func sortMessageIds(msgIds []messageID) []messageID {
        return msgIds
 }
 
-func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+func (nmc *nackMockedConsumer) Wait() <-chan messageID {
        return nmc.ch
 }
 
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index c5345cb..b3b8937 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -256,14 +256,27 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        }
 
        if sendAsBatch {
-               ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, 
msg.ReplicationClusters)
-               if ok == false {
+               added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request, msg.ReplicationClusters)
+               if !added {
                        // The current batch is full.. flush it and retry
                        p.internalFlushCurrentBatch()
+
+                       // after flushing try again to add the current payload
+                       if ok := p.batchBuilder.Add(smm, sequenceID, 
msg.Payload, request, msg.ReplicationClusters); !ok {
+                               p.log.WithField("size", len(msg.Payload)).
+                                       WithField("sequenceID", sequenceID).
+                                       WithField("properties", msg.Properties).
+                                       Error("unable to add message to batch")
+                       }
                }
        } else {
                // Send individually
-               p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, 
msg.ReplicationClusters)
+               if added := p.batchBuilder.Add(smm, sequenceID, msg.Payload, 
request, msg.ReplicationClusters); !added {
+                       p.log.WithField("size", len(msg.Payload)).
+                               WithField("sequenceID", sequenceID).
+                               WithField("properties", msg.Properties).
+                               Error("unable to send single message")
+               }
                p.internalFlushCurrentBatch()
        }
 
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index bc3f8ea..4fcc011 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -558,3 +558,66 @@ func TestProducerMetadata(t *testing.T) {
                assert.Equal(t, v, mv)
        }
 }
+
+// test for issues #76, #114 and #123
+func TestBatchMessageFlushing(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer client.Close()
+
+       topic := newTopicName()
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer producer.Close()
+
+       maxBytes := internal.MaxBatchSize
+       genbytes := func(n int) []byte {
+               c := []byte("a")[0]
+               bytes := make([]byte, n)
+               for i := 0; i < n; i++ {
+                       bytes[i] = c
+               }
+               return bytes
+       }
+
+       msgs := [][]byte{
+               genbytes(maxBytes - 10),
+               genbytes(11),
+       }
+
+       ch := make(chan struct{}, 2)
+       ctx := context.Background()
+       for _, msg := range msgs {
+               msg := &ProducerMessage{
+                       Payload: msg,
+               }
+               producer.SendAsync(ctx, msg, func(id MessageID, producerMessage 
*ProducerMessage, err error) {
+                       ch <- struct{}{}
+               })
+       }
+
+       published := 0
+       keepGoing := true
+       for keepGoing {
+               select {
+               case <-ch:
+                       published++
+                       if published == 2 {
+                               keepGoing = false
+                       }
+               case <-time.After(defaultBatchingMaxPublishDelay * 10):
+                       fmt.Println("TestBatchMessageFlushing timeout waiting 
to publish messages")
+                       keepGoing = false
+               }
+       }
+
+       assert.Equal(t, 2, published, "expected to publish two messages")
+}

Reply via email to