This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 448387d  Support partition consumer receive async and fix batch logic 
(#43)
448387d is described below

commit 448387d738a2f3af4c8232daa4fac9576d252617
Author: 冉小龙 <ranxiaolong...@gmail.com>
AuthorDate: Wed Aug 14 11:00:00 2019 +0800

    Support partition consumer receive async and fix batch logic (#43)
    
    Signed-off-by: xiaolong.ran ranxiaolong...@gmail.com
    
    * Support batch logic for project
    
    * add unit test case of event time
    
    * add some unit tests case for producer
    
    * fix error result type
    
    * add unit test case of producer flush
    
    * add receiver queue size test logic
    
    * support partition consumer receive async
    
    * add unit test case of ack timeout
    
    * Fix consumer receiving message out of order
---
 pulsar/consumer.go                |   3 +
 pulsar/consumer_test.go           | 301 +++++++++++++++++++++++++++++++++++++-
 pulsar/error.go                   |   2 +-
 pulsar/impl_consumer.go           |  55 +++++--
 pulsar/impl_partition_consumer.go | 217 ++++++++++++++++-----------
 pulsar/impl_partition_producer.go |  21 ++-
 pulsar/internal/commands.go       |  82 +++++++----
 pulsar/internal/connection.go     |  54 +++----
 pulsar/producer_test.go           | 260 +++++++++++++++++++++++++++++++-
 pulsar/unackedMsgTracker.go       |  20 +--
 util/util.go                      |  24 ++-
 util/util_test.go                 |  21 ++-
 12 files changed, 870 insertions(+), 190 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index de190e0..c259cd6 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -142,6 +142,9 @@ type Consumer interface {
        // ReceiveAsync appends the message to the msgs channel asynchronously.
        ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
 
+       // ReceiveAsyncWithCallback returns a callback containing the message 
and error objects
+       ReceiveAsyncWithCallback(ctx context.Context, callback func(msg 
Message, err error))
+
        // Ack the consumption of a single message
        Ack(Message) error
 
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 39646d3..6fe86cd 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -1,4 +1,3 @@
-//
 // Licensed to the Apache Software Foundation (ASF) under one
 // or more contributor license agreements.  See the NOTICE file
 // distributed with this work for additional information
@@ -15,7 +14,6 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-//
 
 package pulsar
 
@@ -124,6 +122,67 @@ func TestConsumerConnectError(t *testing.T) {
        assert.Equal(t, err.Error(), "connection error")
 }
 
+func TestBatchMessageReceive(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/receive-batch"
+       subName := "subscription-name"
+       prefix := "msg-batch-"
+       ctx := context.Background()
+
+       // Enable batching on producer side
+       batchSize, numOfMessages := 2, 100
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topicName,
+               BatchingMaxMessages: uint(batchSize),
+               DisableBatching:     false,
+               BlockIfQueueFull:    true,
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, topicName, producer.Topic())
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Equal(t, topicName, consumer.Topic())
+       count := 0
+
+       for i := 0; i < numOfMessages; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               msg := &ProducerMessage{
+                       Payload: []byte(messageContent),
+               }
+               err := producer.Send(ctx, msg)
+               assert.Nil(t, err)
+       }
+
+       for i := 0; i < numOfMessages; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               count++
+       }
+
+       // check strategically
+       for i := 0; i < 3; i++ {
+               if count == numOfMessages {
+                       break
+               }
+               time.Sleep(time.Second)
+       }
+       assert.Equal(t, count, numOfMessages)
+}
+
 func TestConsumerWithInvalidConf(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -263,7 +322,7 @@ func TestConsumerKeyShared(t *testing.T) {
                assert.Nil(t, err)
        }
 
-       time.Sleep(time.Second * 5)
+       time.Sleep(time.Second * 1)
 
        go func() {
                for i := 0; i < 10; i++ {
@@ -288,6 +347,8 @@ func TestConsumerKeyShared(t *testing.T) {
                        }
                }
        }()
+
+       time.Sleep(time.Second * 1)
 }
 
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -300,7 +361,7 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        topic := "persistent://public/default/testGetPartitions"
        testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-       makeHTTPCall(t, http.MethodPut, testURL, "3")
+       makeHTTPCall(t, http.MethodPut, testURL, "5")
 
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
@@ -316,9 +377,10 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        assert.Equal(t, topic+"-partition-2", topics[2])
 
        consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:            topic,
-               SubscriptionName: "my-sub",
-               Type:             Exclusive,
+               Topic:             topic,
+               SubscriptionName:  "my-sub",
+               Type:              Exclusive,
+               ReceiverQueueSize: 10,
        })
        assert.Nil(t, err)
        defer consumer.Close()
@@ -348,3 +410,228 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
 
        assert.Equal(t, len(msgs), 10)
 }
+
+func TestConsumer_ReceiveAsync(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/receive-async"
+       subName := "subscription-receive-async"
+       ctx := context.Background()
+       ch := make(chan ConsumerMessage, 10)
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       defer consumer.Close()
+
+       //send 10 messages
+       for i := 0; i < 10; i++ {
+               err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       //receive async 10 messages
+       err = consumer.ReceiveAsync(ctx, ch)
+       assert.Nil(t, err)
+
+       payloadList := make([]string, 0, 10)
+
+RECEIVE:
+       for {
+               select {
+               case cMsg, ok := <-ch:
+                       if ok {
+                               fmt.Printf("receive message payload is:%s\n", 
string(cMsg.Payload()))
+                               assert.Equal(t, topicName, cMsg.Message.Topic())
+                               assert.Equal(t, topicName, 
cMsg.Consumer.Topic())
+                               payloadList = append(payloadList, 
string(cMsg.Message.Payload()))
+                               if len(payloadList) == 10 {
+                                       break RECEIVE
+                               }
+                       }
+                       continue RECEIVE
+               case <-ctx.Done():
+                       t.Error("context error.")
+                       return
+               }
+       }
+}
+
+func TestConsumerAckTimeout(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := "test-ack-timeout-topic-1"
+       ctx := context.Background()
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub1",
+               Type:             Shared,
+               AckTimeout:       5 * 1000,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create consumer1
+       consumer1, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-sub2",
+               Type:             Shared,
+               AckTimeout:       5 * 1000,
+       })
+       assert.Nil(t, err)
+       defer consumer1.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topic,
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               if err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       log.Fatal(err)
+               }
+       }
+
+       // consumer receive 10 messages
+       payloadList := make([]string, 0, 10)
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               payloadList = append(payloadList, string(msg.Payload()))
+
+               // not ack message
+       }
+       assert.Equal(t, 10, len(payloadList))
+
+       // consumer1 receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := consumer1.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+
+               payloadList = append(payloadList, string(msg.Payload()))
+
+               // ack half of the messages
+               if i%2 == 0 {
+                       err = consumer1.Ack(msg)
+                       assert.Nil(t, err)
+               }
+       }
+
+       // wait ack timeout
+       time.Sleep(6 * time.Second)
+
+       fmt.Println("start redeliver messages...")
+
+       payloadList = make([]string, 0, 10)
+       // consumer receive messages again
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(context.Background())
+               if err != nil {
+                       log.Fatal(err)
+               }
+               payloadList = append(payloadList, string(msg.Payload()))
+
+               // ack message
+               if err := consumer.Ack(msg); err != nil {
+                       log.Fatal(err)
+               }
+       }
+       assert.Equal(t, 10, len(payloadList))
+
+       payloadList = make([]string, 0, 5)
+       // consumer1 receive messages again
+       go func() {
+               for i := 0; i < 10; i++ {
+                       msg, err := consumer1.Receive(context.Background())
+                       if err != nil {
+                               log.Fatal(err)
+                       }
+
+                       expectMsg := fmt.Sprintf("hello-%d", i)
+                       fmt.Printf("redeliver messages, payload is:%s\n", 
expectMsg)
+                       payloadList = append(payloadList, string(msg.Payload()))
+
+                       // ack message
+                       if err := consumer1.Ack(msg); err != nil {
+                               log.Fatal(err)
+                       }
+               }
+               assert.Equal(t, 5, len(payloadList))
+       }()
+
+       // sleep 2 seconds, wait gorutine receive messages.
+       time.Sleep(time.Second * 2)
+}
+
+func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/receive-async-with-callback"
+       subName := "subscription-receive-async"
+       ctx := context.Background()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       defer consumer.Close()
+
+       //send 10 messages
+       for i := 0; i < 10; i++ {
+               err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       for i := 0; i < 10; i++ {
+               consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err 
error) {
+                       if err != nil {
+                               log.Fatal(err)
+                       }
+                       fmt.Printf("receive message payload is:%s\n", 
string(msg.Payload()))
+                       assert.Equal(t, fmt.Sprintf("hello-%d", i), 
string(msg.Payload()))
+               })
+       }
+}
diff --git a/pulsar/error.go b/pulsar/error.go
index 0231913..ec20844 100644
--- a/pulsar/error.go
+++ b/pulsar/error.go
@@ -24,7 +24,7 @@ type Result int
 
 const (
        // ResultOk means no errors
-       ResultOk = iota
+       ResultOk Result = iota
        // ResultUnknownError means unknown error happened on broker
        ResultUnknownError
        // ResultInvalidConfiguration means invalid configuration
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
index 0a44971..13e72ae 100644
--- a/pulsar/impl_consumer.go
+++ b/pulsar/impl_consumer.go
@@ -79,6 +79,7 @@ func newConsumer(client *client, options *ConsumerOptions) 
(*consumer, error) {
 func singleTopicSubscribe(client *client, options *ConsumerOptions, topic 
string) (*consumer, error) {
        c := &consumer{
                topicName: topic,
+               log:       log.WithField("topic", topic),
                queue:     make(chan ConsumerMessage, 
options.ReceiverQueueSize),
        }
 
@@ -100,7 +101,7 @@ func singleTopicSubscribe(client *client, options 
*ConsumerOptions, topic string
 
        for partitionIdx, partitionTopic := range partitions {
                go func(partitionIdx int, partitionTopic string) {
-                       cons, err := newPartitionConsumer(client, 
partitionTopic, options, partitionIdx)
+                       cons, err := newPartitionConsumer(client, 
partitionTopic, options, partitionIdx, numPartitions, c.queue)
                        ch <- ConsumerError{
                                err:       err,
                                partition: partitionIdx,
@@ -153,31 +154,63 @@ func (c *consumer) Unsubscribe() error {
        return nil
 }
 
-func (c *consumer) Receive(ctx context.Context) (Message, error) {
+func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
        for _, pc := range c.consumers {
                go func(pc Consumer) {
-                       if err := pc.ReceiveAsync(ctx, c.queue); err != nil {
+                       err := pc.ReceiveAsync(ctx, c.queue)
+                       if err != nil {
                                return
                        }
                }(pc)
        }
+}
 
-       select {
-       case <-ctx.Done():
-               return nil, ctx.Err()
-       case msg, ok := <-c.queue:
-               if ok {
-                       return msg.Message, nil
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+       if len(c.consumers) > 1 {
+               select {
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               case cMsg, ok := <-c.queue:
+                       if ok {
+                               return cMsg.Message, nil
+                       }
+                       return nil, errors.New("receive message error")
                }
-               return nil, errors.New("receive message error")
        }
+
+       return c.consumers[0].(*partitionConsumer).Receive(ctx)
 }
 
 func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
-       //TODO: impl logic
+       for _, pc := range c.consumers {
+               go func(pc Consumer) {
+                       if err := pc.ReceiveAsync(ctx, msgs); err != nil {
+                               c.log.Errorf("receive async messages error:%s, 
please check.", err.Error())
+                               return
+                       }
+               }(pc)
+       }
+
        return nil
 }
 
+func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback 
func(msg Message, err error)) {
+       var err error
+       if len(c.consumers) > 1 {
+               select {
+               case <-ctx.Done():
+                       c.log.Errorf("ReceiveAsyncWithCallback: receive message 
error:%s", ctx.Err().Error())
+                       return
+               case cMsg, ok := <-c.queue:
+                       if ok {
+                               callback(cMsg.Message, err)
+                       }
+                       return
+               }
+       }
+       c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, 
callback)
+}
+
 //Ack the consumption of a single message
 func (c *consumer) Ack(msg Message) error {
        return c.AckID(msg.ID())
diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
index 0d7069f..87cf68b 100644
--- a/pulsar/impl_partition_consumer.go
+++ b/pulsar/impl_partition_consumer.go
@@ -37,7 +37,7 @@ const maxRedeliverUnacknowledged = 1000
 type consumerState int
 
 const (
-       consumerInit = iota
+       consumerInit consumerState = iota
        consumerReady
        consumerClosing
        consumerClosed
@@ -60,16 +60,17 @@ type partitionConsumer struct {
        consumerID   uint64
        subQueue     chan ConsumerMessage
 
-       omu      sync.Mutex // protects following
-       overflow []*pb.MessageIdData
+       omu               sync.Mutex // protects following
+       redeliverMessages []*pb.MessageIdData
 
        unAckTracker *UnackedMessageTracker
 
        eventsChan   chan interface{}
        partitionIdx int
+       partitionNum int
 }
 
-func newPartitionConsumer(client *client, topic string, options 
*ConsumerOptions, partitionID int) (*partitionConsumer, error) {
+func newPartitionConsumer(client *client, topic string, options 
*ConsumerOptions, partitionID, partitionNum int, ch chan ConsumerMessage) 
(*partitionConsumer, error) {
        c := &partitionConsumer{
                state:        consumerInit,
                client:       client,
@@ -78,6 +79,7 @@ func newPartitionConsumer(client *client, topic string, 
options *ConsumerOptions
                log:          log.WithField("topic", topic),
                consumerID:   client.rpcClient.NewConsumerID(),
                partitionIdx: partitionID,
+               partitionNum: partitionNum,
                eventsChan:   make(chan interface{}),
                subQueue:     make(chan ConsumerMessage, 
options.ReceiverQueueSize),
        }
@@ -108,7 +110,7 @@ func newPartitionConsumer(client *client, topic string, 
options *ConsumerOptions
        if options.Type == Shared || options.Type == KeyShared {
                if options.AckTimeout != 0 {
                        c.unAckTracker = NewUnackedMessageTracker()
-                       c.unAckTracker.pc = c
+                       c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
                        c.unAckTracker.Start(int64(options.AckTimeout))
                }
        }
@@ -128,6 +130,18 @@ func newPartitionConsumer(client *client, topic string, 
options *ConsumerOptions
        c.log = c.log.WithField("name", c.consumerName)
        c.log.Info("Created consumer")
        c.state = consumerReady
+
+       // In here, open a gorutine to receive data asynchronously from the 
subConsumer,
+       // filling the queue channel of the current consumer.
+       if partitionNum > 1 {
+               go func() {
+                       err = c.ReceiveAsync(context.Background(), ch)
+                       if err != nil {
+                               return
+                       }
+               }()
+       }
+
        go c.runEventsLoop()
 
        return c, nil
@@ -238,35 +252,60 @@ func (pc *partitionConsumer) internalUnsubscribe(unsub 
*handleUnsubscribe) {
        unsub.waitGroup.Done()
 }
 
-func (pc *partitionConsumer) Receive(ctx context.Context) (Message, error) {
-       select {
-       case <-ctx.Done():
-               return nil, ctx.Err()
-       case cm, ok := <-pc.subQueue:
-               if ok {
-                       id := &pb.MessageIdData{}
-                       err := proto.Unmarshal(cm.ID().Serialize(), id)
-                       if err != nil {
-                               pc.log.WithError(err).Errorf("unserialize 
message id error:%s", err.Error())
-                               return nil, err
-                       }
-                       if pc.unAckTracker != nil {
-                               pc.unAckTracker.Add(id)
-                       }
-                       return cm.Message, nil
-               }
-               return nil, newError(ResultConnectError, "receive queue closed")
+func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
+       id := &pb.MessageIdData{}
+       err := proto.Unmarshal(msgID.Serialize(), id)
+       if err != nil {
+               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
+               return err
+       }
+       if pc.unAckTracker != nil {
+               pc.unAckTracker.Add(id)
        }
+       return nil
 }
 
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
+func (pc *partitionConsumer) increaseAvailablePermits(receivedSinceFlow 
uint32) error {
        highwater := uint32(math.Max(float64(cap(pc.options.MessageChannel)/2), 
1))
+       if receivedSinceFlow >= highwater {
+               if err := pc.internalFlow(receivedSinceFlow); err != nil {
+                       pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+                       return err
+               }
+               receivedSinceFlow = 0
+       }
+       return nil
+}
+
+func (pc *partitionConsumer) messageProcessed(msgID MessageID, 
receivedSinceFlow uint32) error {
+       err := pc.trackMessage(msgID)
+       if err != nil {
+               return err
+       }
+       receivedSinceFlow++
 
-       // request half the buffer's capacity
-       if err := pc.internalFlow(highwater); err != nil {
-               pc.log.Errorf("Send Flow cmd error:%s", err.Error())
+       err = pc.increaseAvailablePermits(receivedSinceFlow)
+       if err != nil {
                return err
        }
+
+       return nil
+}
+
+func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, 
err error) {
+       wg := &sync.WaitGroup{}
+       wg.Add(1)
+       pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
+               message = msg
+               err = e
+               wg.Done()
+       })
+       wg.Wait()
+
+       return message, err
+}
+
+func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
        var receivedSinceFlow uint32
 
        for {
@@ -274,30 +313,38 @@ func (pc *partitionConsumer) ReceiveAsync(ctx 
context.Context, msgs chan<- Consu
                case tmpMsg, ok := <-pc.subQueue:
                        if ok {
                                msgs <- tmpMsg
-                               id := &pb.MessageIdData{}
-                               err := proto.Unmarshal(tmpMsg.ID().Serialize(), 
id)
+
+                               err := pc.messageProcessed(tmpMsg.ID(), 
receivedSinceFlow)
                                if err != nil {
-                                       
pc.log.WithError(err).Errorf("unserialize message id error:%s", err.Error())
                                        return err
                                }
-                               if pc.unAckTracker != nil {
-                                       pc.unAckTracker.Add(id)
-                               }
-                               receivedSinceFlow++
-                               if receivedSinceFlow >= highwater {
-                                       if err := 
pc.internalFlow(receivedSinceFlow); err != nil {
-                                               pc.log.Errorf("Send Flow cmd 
error:%s", err.Error())
-                                               return err
-                                       }
-                                       receivedSinceFlow = 0
-                               }
                                continue
                        }
+                       break
                case <-ctx.Done():
                        return ctx.Err()
                }
        }
+}
 
+func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, 
callback func(msg Message, err error)) {
+       var receivedSinceFlow uint32
+       var err error
+
+       select {
+       case tmpMsg, ok := <-pc.subQueue:
+               if ok {
+                       err = pc.messageProcessed(tmpMsg.ID(), 
receivedSinceFlow)
+                       callback(tmpMsg.Message, err)
+                       if err != nil {
+                               pc.log.Errorf("processed messages error:%s", 
err.Error())
+                               return
+                       }
+               }
+       case <-ctx.Done():
+               pc.log.Errorf("context shouldn't done, please check error:%s", 
ctx.Err().Error())
+               return
+       }
 }
 
 func (pc *partitionConsumer) Ack(msg Message) error {
@@ -465,23 +512,23 @@ func (pc *partitionConsumer) internalRedeliver(redeliver 
*handleRedeliver) {
        pc.omu.Lock()
        defer pc.omu.Unlock()
 
-       overFlowSize := len(pc.overflow)
+       redeliverMessagesSize := len(pc.redeliverMessages)
 
-       if overFlowSize == 0 {
+       if redeliverMessagesSize == 0 {
                return
        }
 
        requestID := pc.client.rpcClient.NewRequestID()
 
-       for i := 0; i < len(pc.overflow); i += maxRedeliverUnacknowledged {
+       for i := 0; i < len(pc.redeliverMessages); i += 
maxRedeliverUnacknowledged {
                end := i + maxRedeliverUnacknowledged
-               if end > overFlowSize {
-                       end = overFlowSize
+               if end > redeliverMessagesSize {
+                       end = redeliverMessagesSize
                }
                _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, 
requestID,
                        pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, 
&pb.CommandRedeliverUnacknowledgedMessages{
                                ConsumerId: proto.Uint64(pc.consumerID),
-                               MessageIds: pc.overflow[i:end],
+                               MessageIds: pc.redeliverMessages[i:end],
                        })
                if err != nil {
                        pc.log.WithError(err).Error("Failed to unsubscribe 
consumer")
@@ -489,8 +536,8 @@ func (pc *partitionConsumer) internalRedeliver(redeliver 
*handleRedeliver) {
                }
        }
 
-       // clear Overflow slice
-       pc.overflow = nil
+       // clear redeliverMessages slice
+       pc.redeliverMessages = nil
 
        if pc.unAckTracker != nil {
                pc.unAckTracker.clear()
@@ -574,56 +621,58 @@ func (pc *partitionConsumer) internalFlow(permits uint32) 
error {
        return nil
 }
 
-func (pc *partitionConsumer) HandlerMessage(response *pb.CommandMessage, 
headersAndPayload []byte) error {
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload []byte) error {
        msgID := response.GetMessageId()
 
        id := newMessageID(int64(msgID.GetLedgerId()), 
int64(msgID.GetEntryId()),
                int(msgID.GetBatchIndex()), pc.partitionIdx)
 
-       msgMeta, payload, err := internal.ParseMessage(headersAndPayload)
+       msgMeta, payloadList, err := internal.ParseMessage(headersAndPayload)
        if err != nil {
                return fmt.Errorf("parse message error:%s", err)
        }
 
-       //numMsgs := msgMeta.GetNumMessagesInBatch()
-
-       msg := &message{
-               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-               eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-               key:         msgMeta.GetPartitionKey(),
-               properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
-               topic:       pc.topic,
-               msgID:       id,
-               payLoad:     payload,
-       }
+       for _, payload := range payloadList {
+               msg := &message{
+                       publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                       eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                       key:         msgMeta.GetPartitionKey(),
+                       properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                       topic:       pc.topic,
+                       msgID:       id,
+                       payLoad:     payload,
+               }
 
-       consumerMsg := ConsumerMessage{
-               Message:  msg,
-               Consumer: pc,
-       }
+               consumerMsg := ConsumerMessage{
+                       Message:  msg,
+                       Consumer: pc,
+               }
 
-       select {
-       case pc.subQueue <- consumerMsg:
-               // Add messageId to Overflow buffer, avoiding duplicates.
-               newMid := response.GetMessageId()
-               var dup bool
-
-               pc.omu.Lock()
-               for _, mid := range pc.overflow {
-                       if proto.Equal(mid, newMid) {
-                               dup = true
-                               break
+               select {
+               case pc.subQueue <- consumerMsg:
+                       //Add messageId to redeliverMessages buffer, avoiding 
duplicates.
+                       newMid := response.GetMessageId()
+                       var dup bool
+
+                       pc.omu.Lock()
+                       for _, mid := range pc.redeliverMessages {
+                               if proto.Equal(mid, newMid) {
+                                       dup = true
+                                       break
+                               }
                        }
-               }
 
-               if !dup {
-                       pc.overflow = append(pc.overflow, newMid)
+                       if !dup {
+                               pc.redeliverMessages = 
append(pc.redeliverMessages, newMid)
+                       }
+                       pc.omu.Unlock()
+                       continue
+               default:
+                       return fmt.Errorf("consumer message channel on topic %s 
is full (capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
                }
-               pc.omu.Unlock()
-               return nil
-       default:
-               return fmt.Errorf("consumer message channel on topic %s is full 
(capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
        }
+
+       return nil
 }
 
 type handleAck struct {
diff --git a/pulsar/impl_partition_producer.go 
b/pulsar/impl_partition_producer.go
index 98e2156..e00c1e1 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -34,7 +34,7 @@ import (
 type producerState int
 
 const (
-       producerInit = iota
+       producerInit producerState = iota
        producerReady
        producerClosing
        producerClosed
@@ -249,7 +249,8 @@ func (p *partitionProducer) internalSend(request 
*sendRequest) {
        sequenceID := internal.GetAndAdd(p.sequenceIDGenerator, 1)
 
        if sendAsBatch {
-               for p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, 
msg.ReplicationClusters) == false {
+               ok := p.batchBuilder.Add(smm, sequenceID, msg.Payload, request, 
msg.ReplicationClusters)
+               if ok == false {
                        // The current batch is full.. flush it and retry
                        p.internalFlushCurrentBatch()
                }
@@ -321,13 +322,25 @@ func (p *partitionProducer) Send(ctx context.Context, msg 
*ProducerMessage) erro
 func (p *partitionProducer) SendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error)) {
        p.publishSemaphore.Acquire()
-       p.eventsChan <- &sendRequest{ctx, msg, callback, false}
+       sr := &sendRequest{
+               ctx:              ctx,
+               msg:              msg,
+               callback:         callback,
+               flushImmediately: false,
+       }
+       p.eventsChan <- sr
 }
 
 func (p *partitionProducer) internalSendAsync(ctx context.Context, msg 
*ProducerMessage,
        callback func(MessageID, *ProducerMessage, error), flushImmediately 
bool) {
        p.publishSemaphore.Acquire()
-       p.eventsChan <- &sendRequest{ctx, msg, callback, flushImmediately}
+       sr := &sendRequest{
+               ctx:              ctx,
+               msg:              msg,
+               callback:         callback,
+               flushImmediately: flushImmediately,
+       }
+       p.eventsChan <- sr
 }
 
 func (p *partitionProducer) ReceivedSendReceipt(response 
*pb.CommandSendReceipt) {
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 04545c1..1f29f7f 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -87,7 +87,7 @@ func addSingleMessageToBatch(wb Buffer, smm 
*pb.SingleMessageMetadata, payload [
        wb.Write(payload)
 }
 
-func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, 
payload []byte, err error) {
+func ParseMessage(headersAndPayload []byte) (msgMeta *pb.MessageMetadata, 
payloadList [][]byte, err error) {
        // reusable buffer for 4-byte uint32s
        buf32 := make([]byte, 4)
        r := bytes.NewReader(headersAndPayload)
@@ -164,33 +164,63 @@ func ParseMessage(headersAndPayload []byte) (msgMeta 
*pb.MessageMetadata, payloa
                return nil, nil, err
        }
 
+       numMsg := msgMeta.GetNumMessagesInBatch()
+
+       if numMsg > 0 && msgMeta.NumMessagesInBatch != nil {
+               payloads := make([]byte, lr.N)
+               if _, err = io.ReadFull(lr, payloads); err != nil {
+                       return nil, nil, err
+               }
+
+               singleMessages, err := decodeBatchPayload(payloads, numMsg)
+               if err != nil {
+                       return nil, nil, err
+               }
+
+               payloadList = make([][]byte, 0, numMsg)
+               for _, singleMsg := range singleMessages {
+                       msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
+                       msgMeta.Properties = singleMsg.SingleMeta.Properties
+                       msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+                       payloadList = append(payloadList, 
singleMsg.SinglePayload)
+               }
+
+               if err := computeChecksum(chksum, expectedChksum); err != nil {
+                       return nil, nil, err
+               }
+               return msgMeta, payloadList, nil
+       }
        // Anything left in the frame is considered
        // the payload and can be any sequence of bytes.
-       payloads := make([]byte, lr.N)
-       if _, err = io.ReadFull(lr, payloads); err != nil {
-               return nil, nil, err
-       }
+       payloadList = make([][]byte, 0, 10)
+       if lr.N > 0 {
+               // guard against allocating large buffer
+               if lr.N > MaxFrameSize {
+                       return nil, nil, fmt.Errorf("frame payload size (%d) 
cannot be greater than max frame size (%d)", lr.N, MaxFrameSize)
+               }
 
-       numMsg := msgMeta.GetNumMessagesInBatch()
+               payload := make([]byte, lr.N)
+               if _, err = io.ReadFull(lr, payload); err != nil {
+                       return nil, nil, err
+               }
 
-       singleMessages, err := decodeBatchPayload(payloads, numMsg)
-       if err != nil {
-               return nil, nil, err
+               payloadList = append(payloadList, payload)
        }
 
-       for _, singleMsg := range singleMessages {
-               payload = singleMsg.SinglePayload
-               msgMeta.PartitionKey = singleMsg.SingleMeta.PartitionKey
-               msgMeta.Properties = singleMsg.SingleMeta.Properties
-               msgMeta.EventTime = singleMsg.SingleMeta.EventTime
+       if err := computeChecksum(chksum, expectedChksum); err != nil {
+               return nil, nil, err
        }
 
-       if computed := chksum.compute(); !bytes.Equal(computed, expectedChksum) 
{
-               return nil, nil, fmt.Errorf("checksum mismatch: computed (0x%X) 
does "+
+       return msgMeta, payloadList, nil
+}
+
+func computeChecksum(chksum CheckSum, expectedChksum []byte) error {
+       computed := chksum.compute()
+       if !bytes.Equal(computed, expectedChksum) {
+               return fmt.Errorf("checksum mismatch: computed (0x%X) does "+
                        "not match given checksum (0x%X)", computed, 
expectedChksum)
        }
-
-       return msgMeta, payload, nil
+       return nil
 }
 
 func serializeBatch(wb Buffer, cmdSend *pb.BaseCommand, msgMetadata 
*pb.MessageMetadata, payload []byte) {
@@ -252,7 +282,7 @@ type singleMessage struct {
 func decodeBatchPayload(bp []byte, batchNum int32) ([]*singleMessage, error) {
        buf32 := make([]byte, 4)
        rdBuf := bytes.NewReader(bp)
-       list := make([]*singleMessage, 0, batchNum)
+       singleMsgList := make([]*singleMessage, 0, batchNum)
        for i := int32(0); i < batchNum; i++ {
                // singleMetaSize
                if _, err := io.ReadFull(rdBuf, buf32); err != nil {
@@ -274,13 +304,15 @@ func decodeBatchPayload(bp []byte, batchNum int32) 
([]*singleMessage, error) {
                if _, err := io.ReadFull(rdBuf, singlePayload); err != nil {
                        return nil, err
                }
-               d := &singleMessage{}
-               d.SingleMetaSize = singleMetaSize
-               d.SingleMeta = singleMeta
-               d.SinglePayload = singlePayload
-               list = append(list, d)
+               singleMsg := &singleMessage{
+                       SingleMetaSize: singleMetaSize,
+                       SingleMeta:     singleMeta,
+                       SinglePayload:  singlePayload,
+               }
+
+               singleMsgList = append(singleMsgList, singleMsg)
        }
-       return list, nil
+       return singleMsgList, nil
 }
 
 // ConvertFromStringMap convert a string map to a KeyValue []byte
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index d83f30b..2c707ea 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -64,7 +64,7 @@ type Connection interface {
 }
 
 type ConsumerHandler interface {
-    HandlerMessage(response *pb.CommandMessage, headersAndPayload []byte) error
+       MessageReceived(response *pb.CommandMessage, headersAndPayload []byte) 
error
 }
 
 type connectionState int
@@ -131,7 +131,7 @@ func newConnection(logicalAddr *url.URL, physicalAddr 
*url.URL, tlsOptions *TLSO
                incomingRequests: make(chan *request),
                writeRequests:    make(chan []byte),
                listeners:        make(map[uint64]ConnectionListener),
-        connWrapper:      NewConnWrapper(),
+               connWrapper:      NewConnWrapper(),
        }
        cnx.reader = newConnectionReader(cnx)
        cnx.cond = sync.NewCond(cnx)
@@ -307,7 +307,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
 func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload 
[]byte) {
        c.log.Debugf("Received command: %s -- payload: %v", cmd, 
headersAndPayload)
        c.lastDataReceivedTime = time.Now()
-    var err error
+       var err error
 
        switch *cmd.Type {
        case pb.BaseCommand_SUCCESS:
@@ -344,7 +344,7 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, 
headersAndPayload []by
        case pb.BaseCommand_SEND_ERROR:
 
        case pb.BaseCommand_MESSAGE:
-        err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
+               err = c.handleMessage(cmd.GetMessage(), headersAndPayload)
        case pb.BaseCommand_PING:
                c.handlePing()
        case pb.BaseCommand_PONG:
@@ -353,9 +353,9 @@ func (c *connection) receivedCommand(cmd *pb.BaseCommand, 
headersAndPayload []by
        case pb.BaseCommand_ACTIVE_CONSUMER_CHANGE:
 
        default:
-        if err != nil {
-            c.log.Errorf("Received invalid command type: %s", cmd.Type)
-        }
+               if err != nil {
+                       c.log.Errorf("Received invalid command type: %s", 
cmd.Type)
+               }
                c.Close()
        }
 }
@@ -403,18 +403,18 @@ func (c *connection) handleSendReceipt(response 
*pb.CommandSendReceipt) {
 }
 
 func (c *connection) handleMessage(response *pb.CommandMessage, payload 
[]byte) error {
-    c.log.Debug("Got Message: ", response)
-    consumerId := response.GetConsumerId()
-    if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
-        err := consumer.HandlerMessage(response, payload)
-        if err != nil {
-            c.log.WithField("consumerId", consumerId).Error("handle message 
err: ", response.MessageId)
-            return errors.New("handler not found")
-        }
-    } else {
-        c.log.WithField("consumerId", consumerId).Warn("Got unexpected 
message: ", response.MessageId)
-    }
-    return nil
+       c.log.Debug("Got Message: ", response)
+       consumerId := response.GetConsumerId()
+       if consumer, ok := c.connWrapper.Consumers[consumerId]; ok {
+               err := consumer.MessageReceived(response, payload)
+               if err != nil {
+                       c.log.WithField("consumerId", consumerId).Error("handle 
message err: ", response.MessageId)
+                       return errors.New("handler not found")
+               }
+       } else {
+               c.log.WithField("consumerId", consumerId).Warn("Got unexpected 
message: ", response.MessageId)
+       }
+       return nil
 }
 
 func (c *connection) sendPing() {
@@ -522,8 +522,8 @@ func (c *connection) getTLSConfig() (*tls.Config, error) {
 }
 
 type ConnWrapper struct {
-       Rwmu             sync.RWMutex
-       Consumers        map[uint64]ConsumerHandler
+       Rwmu      sync.RWMutex
+       Consumers map[uint64]ConsumerHandler
 }
 
 func NewConnWrapper() *ConnWrapper {
@@ -533,13 +533,13 @@ func NewConnWrapper() *ConnWrapper {
 }
 
 func (c *connection) AddConsumeHandler(id uint64, handler ConsumerHandler) {
-    c.connWrapper.Rwmu.Lock()
-    c.connWrapper.Consumers[id] = handler
-    c.connWrapper.Rwmu.Unlock()
+       c.connWrapper.Rwmu.Lock()
+       c.connWrapper.Consumers[id] = handler
+       c.connWrapper.Rwmu.Unlock()
 }
 
 func (c *connection) DeleteConsumeHandler(id uint64) {
-    c.connWrapper.Rwmu.Lock()
-    delete(c.connWrapper.Consumers, id)
-    c.connWrapper.Rwmu.Unlock()
+       c.connWrapper.Rwmu.Lock()
+       delete(c.connWrapper.Consumers, id)
+       c.connWrapper.Rwmu.Unlock()
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 66683d0..e9327d0 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
+       "net/http"
        "sync"
        "testing"
        "time"
@@ -30,6 +31,55 @@ import (
        log "github.com/sirupsen/logrus"
 )
 
+func TestInvalidURL(t *testing.T) {
+       client, err := NewClient(ClientOptions{})
+
+       if client != nil || err == nil {
+               t.Fatal("Should have failed to create client")
+       }
+}
+
+func TestProducerConnectError(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://invalid-hostname:6650",
+       })
+
+       assert.Nil(t, err)
+
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: newTopicName(),
+       })
+
+       // Expect error in creating producer
+       assert.Nil(t, producer)
+       assert.NotNil(t, err)
+
+       assert.Equal(t, err.Error(), "connection error")
+}
+
+func TestProducerNoTopic(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       if err != nil {
+               t.Fatal(err)
+               return
+       }
+
+       defer client.Close()
+
+       producer, err := client.CreateProducer(ProducerOptions{})
+
+       // Expect error in creating producer
+       assert.Nil(t, producer)
+       assert.NotNil(t, err)
+
+       assert.Equal(t, err.(*Error).Result(), ResultInvalidTopicName)
+}
+
 func TestSimpleProducer(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: serviceURL,
@@ -92,7 +142,8 @@ func TestProducerAsyncSend(t *testing.T) {
                assert.NoError(t, err)
        }
 
-       producer.Flush()
+       err = producer.Flush()
+       assert.Nil(t, err)
 
        wg.Wait()
 
@@ -181,6 +232,213 @@ func TestProducerLastSequenceID(t *testing.T) {
        assert.NoError(t, err)
 }
 
+func TestEventTime(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := "test-event-time"
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topicName,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "subName",
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       eventTime := timeFromUnixTimestampMillis(uint64(1565161612))
+       err = producer.Send(context.Background(), &ProducerMessage{
+               Payload:   []byte(fmt.Sprintf("test-event-time")),
+               EventTime: &eventTime,
+       })
+       assert.Nil(t, err)
+
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       actualEventTime := msg.EventTime()
+       assert.Equal(t, eventTime.Unix(), actualEventTime.Unix())
+}
+
+func TestFlushInProducer(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       topicName := "test-flush-in-producer"
+       subName := "subscription-name"
+       numOfMessages := 10
+       ctx := context.Background()
+
+       // set batch message number numOfMessages, and max delay 10s
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBatching:         false,
+               BatchingMaxMessages:     uint(numOfMessages),
+               BatchingMaxPublishDelay: time.Second * 10,
+               BlockIfQueueFull:        true,
+               Properties: map[string]string{
+                       "producer-name": "test-producer-name",
+                       "producer-id":   "test-producer-id",
+               },
+       })
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       prefix := "msg-batch-async"
+       msgCount := 0
+
+       wg := sync.WaitGroup{}
+       wg.Add(5)
+       errors := util.NewBlockingQueue(10)
+       for i := 0; i < numOfMessages/2; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               producer.SendAsync(ctx, &ProducerMessage{
+                       Payload: []byte(messageContent),
+               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                       if e != nil {
+                               log.WithError(e).Error("Failed to publish")
+                               errors.Put(e)
+                       } else {
+                               log.Info("Published message ", id)
+                       }
+                       wg.Done()
+               })
+               assert.Nil(t, err)
+       }
+       err = producer.Flush()
+       assert.Nil(t, err)
+       wg.Wait()
+
+       for i := 0; i < numOfMessages/2; i++ {
+               _, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               msgCount++
+       }
+
+       assert.Equal(t, msgCount, numOfMessages/2)
+
+       wg.Add(5)
+       for i := numOfMessages / 2; i < numOfMessages; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               producer.SendAsync(ctx, &ProducerMessage{
+                       Payload: []byte(messageContent),
+               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                       if e != nil {
+                               log.WithError(e).Error("Failed to publish")
+                               errors.Put(e)
+                       } else {
+                               log.Info("Published message ", id)
+                       }
+                       wg.Done()
+               })
+               assert.Nil(t, err)
+       }
+
+       err = producer.Flush()
+       assert.Nil(t, err)
+       wg.Wait()
+
+       for i := numOfMessages / 2; i < numOfMessages; i++ {
+               _, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               msgCount++
+       }
+       assert.Equal(t, msgCount, numOfMessages)
+}
+
+func TestFlushInPartitionedProducer(t *testing.T) {
+       topicName := 
"persistent://public/default/partition-testFlushInPartitionedProducer"
+
+       // call admin api to make it partitioned
+       url := adminURL + "/" + "admin/v2/" + topicName + "/partitions"
+       makeHTTPCall(t, http.MethodPut, url, "5")
+
+       numberOfPartitions := 5
+       numOfMessages := 10
+       ctx := context.Background()
+
+       // creat client connection
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.NoError(t, err)
+       defer client.Close()
+
+       // create consumer
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "my-sub",
+               Type:             Exclusive,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       // create producer and set batch message number numOfMessages, and max 
delay 10s
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:                   topicName,
+               DisableBatching:         false,
+               BatchingMaxMessages:     uint(numOfMessages / 
numberOfPartitions),
+               BatchingMaxPublishDelay: time.Second * 10,
+               BlockIfQueueFull:        true,
+       })
+       defer producer.Close()
+
+       // send 5 messages
+       prefix := "msg-batch-async-"
+       wg := sync.WaitGroup{}
+       wg.Add(5)
+       errors := util.NewBlockingQueue(5)
+       for i := 0; i < numOfMessages/2; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               producer.SendAsync(ctx, &ProducerMessage{
+                       Payload: []byte(messageContent),
+               }, func(id MessageID, producerMessage *ProducerMessage, e 
error) {
+                       if e != nil {
+                               log.WithError(e).Error("Failed to publish")
+                               errors.Put(e)
+                       } else {
+                               log.Info("Published message: ", id)
+                       }
+                       wg.Done()
+               })
+               assert.Nil(t, err)
+       }
+
+       // After flush, should be able to consume.
+       err = producer.Flush()
+       assert.Nil(t, err)
+
+       wg.Wait()
+
+       // Receive all messages
+       msgCount := 0
+       for i := 0; i < numOfMessages/2; i++ {
+               msg, err := consumer.Receive(ctx)
+               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+                       msg.ID(), string(msg.Payload()))
+               assert.Nil(t, err)
+               err = consumer.Ack(msg)
+               assert.Nil(t, err)
+               msgCount++
+       }
+       assert.Equal(t, msgCount, numOfMessages/2)
+}
+
 func TestMessageRouter(t *testing.T) {
        // Create topic with 5 partitions
        
httpPut("http://localhost:8080/admin/v2/persistent/public/default/my-partitioned-topic/partitions";,
 5)
diff --git a/pulsar/unackedMsgTracker.go b/pulsar/unackedMsgTracker.go
index 8ec51c6..c46b731 100644
--- a/pulsar/unackedMsgTracker.go
+++ b/pulsar/unackedMsgTracker.go
@@ -34,7 +34,6 @@ type UnackedMessageTracker struct {
        oldOpenSet set.Set
        timeout    *time.Ticker
 
-       pc  *partitionConsumer
        pcs []*partitionConsumer
 }
 
@@ -159,22 +158,7 @@ func (t *UnackedMessageTracker) 
handlerCmd(ackTimeoutMillis int64) {
 
                                t.oldOpenSet.Clear()
 
-                               if t.pc != nil {
-                                       requestID := 
t.pc.client.rpcClient.NewRequestID()
-                                       cmd := 
&pb.CommandRedeliverUnacknowledgedMessages{
-                                               ConsumerId: 
proto.Uint64(t.pc.consumerID),
-                                               MessageIds: messageIds,
-                                       }
-
-                                       _, err := 
t.pc.client.rpcClient.RequestOnCnx(t.pc.cnx, requestID,
-                                               
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
-                                       if err != nil {
-                                               
t.pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-                                               return
-                                       }
-
-                                       log.Debugf("consumer:%v redeliver 
messages num:%d", t.pc.consumerName, len(messageIds))
-                               } else if t.pcs != nil {
+                               if t.pcs != nil {
                                        messageIdsMap := 
make(map[int32][]*pb.MessageIdData)
                                        for _, msgID := range messageIds {
                                                
messageIdsMap[msgID.GetPartition()] = 
append(messageIdsMap[msgID.GetPartition()], msgID)
@@ -198,7 +182,7 @@ func (t *UnackedMessageTracker) handlerCmd(ackTimeoutMillis 
int64) {
                                        }
                                }
                        }
-                       log.Debug("Tick at ", tick)
+                       log.Debugf("Tick at: %v", tick)
                }
 
                t.toggle()
diff --git a/util/util.go b/util/util.go
index 06a7e53..bd4f5d6 100644
--- a/util/util.go
+++ b/util/util.go
@@ -18,15 +18,27 @@
 package util
 
 import (
-    `reflect`
+       "reflect"
 )
 
 // IsNil check if the interface is nil
 func IsNil(i interface{}) bool {
-    vi := reflect.ValueOf(i)
-    if vi.Kind() == reflect.Ptr {
-        return vi.IsNil()
-    }
-    return false
+       vi := reflect.ValueOf(i)
+       if vi.Kind() == reflect.Ptr {
+               return vi.IsNil()
+       }
+       return false
 }
 
+// RemoveDuplicateElement remove repeating elements from the string slice
+func RemoveDuplicateElement(addrs []string) []string {
+       result := make([]string, 0, len(addrs))
+       temp := map[string]struct{}{}
+       for _, item := range addrs {
+               if _, ok := temp[item]; !ok {
+                       temp[item] = struct{}{}
+                       result = append(result, item)
+               }
+       }
+       return result
+}
diff --git a/util/util_test.go b/util/util_test.go
index 2e1195c..284dd0c 100644
--- a/util/util_test.go
+++ b/util/util_test.go
@@ -18,14 +18,23 @@
 package util
 
 import (
-    `github.com/stretchr/testify/assert`
-    `testing`
+       "fmt"
+       "github.com/stretchr/testify/assert"
+       "strings"
+       "testing"
 )
 
 func TestIsNil(t *testing.T) {
-    var a interface{} = nil
-    var b interface{} = (*int)(nil)
+       var a interface{} = nil
+       var b interface{} = (*int)(nil)
 
-    assert.True(t, a == nil)
-    assert.False(t, b == nil)
+       assert.True(t, a == nil)
+       assert.False(t, b == nil)
+}
+
+func TestRemoveDuplicateElement(t *testing.T) {
+       s := []string{"hello", "world", "hello", "golang", "hello", "ruby", 
"php", "java"}
+       resList := RemoveDuplicateElement(s)
+       res := fmt.Sprintf("%s", resList[:])
+       assert.Equal(t, 1, strings.Count(res, "hello"))
 }

Reply via email to