This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 39d7cdc  Simplify and refactor parts of  the single topic consumer. 
(#86)
39d7cdc is described below

commit 39d7cdc46ac49299148f293ae9d5fa154d29b8ad
Author: cckellogg <[email protected]>
AuthorDate: Mon Nov 11 13:57:45 2019 -0800

    Simplify and refactor parts of  the single topic consumer. (#86)
    
    * Simplify and refactor parts of  the single topic consumer.
    
    * Wait when closing a consumer and change get chan method.
---
 pulsar/consumer.go                |  49 +--
 pulsar/consumer_impl.go           | 278 ++++++++++++++
 pulsar/consumer_partition.go      | 510 ++++++++++++++++++++++++++
 pulsar/consumer_test.go           | 442 ++++-------------------
 util/error.go => pulsar/helper.go |  12 +-
 pulsar/impl_client.go             |   9 +-
 pulsar/impl_consumer.go           | 316 ----------------
 pulsar/impl_partition_consumer.go | 739 --------------------------------------
 pulsar/internal/connection.go     |  19 +-
 pulsar/internal/rpc_client.go     |   3 +
 pulsar/test_helper.go             |  20 ++
 pulsar/unacked_msg_tracker.go     |   2 +-
 util/util.go                      |  44 ---
 util/util_test.go                 |  41 ---
 14 files changed, 914 insertions(+), 1570 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c259cd6..3b729ec 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -35,11 +35,13 @@ const (
        // Exclusive there can be only 1 consumer on the same topic with the 
same subscription name
        Exclusive SubscriptionType = iota
 
-       // Shared subscription mode, multiple consumer will be able to use the 
same subscription name and the messages will be dispatched according to
+       // Shared subscription mode, multiple consumer will be able to use the 
same subscription name
+       // and the messages will be dispatched according to
        // a round-robin rotation between the connected consumers
        Shared
 
-       // Failover subscription mode, multiple consumer will be able to use 
the same subscription name but only 1 consumer will receive the messages.
+       // Failover subscription mode, multiple consumer will be able to use 
the same subscription name
+       // but only 1 consumer will receive the messages.
        // If that consumer disconnects, one of the other connected consumers 
will start receiving messages.
        Failover
 
@@ -48,14 +50,14 @@ const (
        KeyShared
 )
 
-type InitialPosition int
+type SubscriptionInitialPosition int
 
 const (
        // Latest position which means the start consuming position will be the 
last message
-       Latest InitialPosition = iota
+       SubscriptionPositionLatest SubscriptionInitialPosition = iota
 
        // Earliest position which means the start consuming position will be 
the first message
-       Earliest
+       SubscriptionPositionEarliest
 )
 
 // ConsumerOptions is used to configure and create instances of Consumer
@@ -91,7 +93,7 @@ type ConsumerOptions struct {
 
        // InitialPosition at which the cursor will be set when subscribe
        // Default is `Latest`
-       SubscriptionInitPos InitialPosition
+       SubscriptionInitialPosition
 
        // Sets a `MessageChannel` for the consumer
        // When a message is received, it will be pushed to the channel for 
consumption
@@ -139,11 +141,8 @@ type Consumer interface {
        // This calls blocks until a message is available.
        Receive(context.Context) (Message, error)
 
-       // ReceiveAsync appends the message to the msgs channel asynchronously.
-       ReceiveAsync(ctx context.Context, msgs chan<- ConsumerMessage) error
-
-       // ReceiveAsyncWithCallback returns a callback containing the message 
and error objects
-       ReceiveAsyncWithCallback(ctx context.Context, callback func(msg 
Message, err error))
+       // Chan returns a channel to consume messages from
+       Chan() <-chan ConsumerMessage
 
        // Ack the consumption of a single message
        Ack(Message) error
@@ -151,34 +150,6 @@ type Consumer interface {
        // AckID the consumption of a single message, identified by its 
MessageID
        AckID(MessageID) error
 
-       // AckCumulative the reception of all the messages in the stream up to 
(and including) the provided message.
-       // This method will block until the acknowledge has been sent to the 
broker. After that, the messages will not be
-       // re-delivered to this consumer.
-       //
-       // Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-       //
-       // It's equivalent to calling asyncAcknowledgeCumulative(Message) and 
waiting for the callback to be triggered.
-       AckCumulative(Message) error
-
-       // AckCumulativeID the reception of all the messages in the stream up 
to (and including) the provided message.
-       // This method will block until the acknowledge has been sent to the 
broker. After that, the messages will not be
-       // re-delivered to this consumer.
-       // Cumulative acknowledge cannot be used when the consumer type is set 
to ConsumerShared.
-       // It's equivalent to calling asyncAcknowledgeCumulative(MessageID) and 
waiting for the callback to be triggered.
-       AckCumulativeID(MessageID) error
-
        // Close the consumer and stop the broker to push more messages
        Close() error
-
-       // Seek reset the subscription associated with this consumer to a 
specific message id.
-       // The message id can either be a specific message or represent the 
first or last messages in the topic.
-       // Note: this operation can only be done on non-partitioned topics. For 
these, one can rather perform the
-       //       seek() on the individual partitions.
-       Seek(msgID MessageID) error
-
-       // RedeliverUnackedMessages redeliver all the unacknowledged messages. 
In Failover mode, the request is ignored if the consumer is not
-       // active for the given topic. In Shared mode, the consumers messages 
to be redelivered are distributed across all
-       // the connected consumers. This is a non blocking call and doesn't 
throw an exception. In case the connection
-       // breaks, the messages are redelivered after reconnect.
-       RedeliverUnackedMessages() error
 }
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
new file mode 100644
index 0000000..af9b8d5
--- /dev/null
+++ b/pulsar/consumer_impl.go
@@ -0,0 +1,278 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "math/rand"
+       "sync"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+var ErrConsumerClosed = errors.New("consumer closed")
+
+type consumer struct {
+       topic string
+
+       options ConsumerOptions
+
+       consumers []*partitionConsumer
+
+       // channel used to deliver message to clients
+       messageCh chan ConsumerMessage
+
+       closeCh chan struct{}
+       errorCh chan error
+
+       log *log.Entry
+}
+
+func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
+       if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
+               return nil, newError(TopicNotFound, "topic is required")
+       }
+
+       if options.SubscriptionName == "" {
+               return nil, newError(SubscriptionNotFound, "subscription name 
is required for consumer")
+       }
+
+       if options.ReceiverQueueSize == 0 {
+               options.ReceiverQueueSize = 1000
+       }
+
+       // did the user pass in a message channel?
+       messageCh := options.MessageChannel
+       if options.MessageChannel == nil {
+               messageCh = make(chan ConsumerMessage, 10)
+       }
+
+       // single topic consumer
+       if options.Topic != "" || len(options.Topics) == 1 {
+               topic := options.Topic
+               if topic == "" {
+                       topic = options.Topics[0]
+               }
+
+               if _, err := internal.ParseTopicName(topic); err != nil {
+                       return nil, err
+               }
+
+               return topicSubscribe(client, options, topic, messageCh)
+       }
+
+       return nil, newError(ResultInvalidTopicName, "topic name is required 
for consumer")
+}
+
+func topicSubscribe(client *client, options ConsumerOptions, topic string,
+       messageCh chan ConsumerMessage) (Consumer, error) {
+       consumer := &consumer{
+               topic:     topic,
+               messageCh: messageCh,
+               errorCh:   make(chan error),
+               log:       log.WithField("topic", topic),
+       }
+
+       partitions, err := client.TopicPartitions(topic)
+       if err != nil {
+               return nil, err
+       }
+
+       numPartitions := len(partitions)
+       consumer.consumers = make([]*partitionConsumer, numPartitions)
+
+       type ConsumerError struct {
+               err       error
+               partition int
+               consumer  *partitionConsumer
+       }
+
+       consumerName := options.Name
+       if consumerName == "" {
+               consumerName = generateRandomName()
+       }
+
+       receiverQueueSize := options.ReceiverQueueSize
+       var wg sync.WaitGroup
+       ch := make(chan ConsumerError, numPartitions)
+       for partitionIdx, partitionTopic := range partitions {
+               wg.Add(1)
+               go func(idx int, pt string) {
+                       defer wg.Done()
+                       opts := &partitionConsumerOpts{
+                               topic:               pt,
+                               consumerName:        consumerName,
+                               subscription:        options.SubscriptionName,
+                               subscriptionType:    options.Type,
+                               subscriptionInitPos: 
options.SubscriptionInitialPosition,
+                               partitionIdx:        idx,
+                               receiverQueueSize:   receiverQueueSize,
+                       }
+                       cons, err := newPartitionConsumer(consumer, client, 
opts, messageCh)
+                       ch <- ConsumerError{
+                               err:       err,
+                               partition: idx,
+                               consumer:  cons,
+                       }
+               }(partitionIdx, partitionTopic)
+       }
+
+       go func() {
+               wg.Wait()
+               close(ch)
+       }()
+
+       for ce := range ch {
+               if ce.err != nil {
+                       err = ce.err
+               } else {
+                       consumer.consumers[ce.partition] = ce.consumer
+               }
+       }
+
+       if err != nil {
+               // Since there were some failures,
+               // cleanup all the partitions that succeeded in creating the 
consumer
+               for _, c := range consumer.consumers {
+                       if c != nil {
+                               _ = c.Close()
+                       }
+               }
+               return nil, err
+       }
+
+       return consumer, nil
+}
+
+func (c *consumer) Topic() string {
+       return c.topic
+}
+
+func (c *consumer) Subscription() string {
+       return c.options.SubscriptionName
+}
+
+func (c *consumer) Unsubscribe() error {
+       var errMsg string
+       for _, consumer := range c.consumers {
+               if err := consumer.Unsubscribe(); err != nil {
+                       errMsg += fmt.Sprintf("topic %s, subscription %s: %s", 
c.Topic(), c.Subscription(), err)
+               }
+       }
+       if errMsg != "" {
+               return fmt.Errorf(errMsg)
+       }
+       return nil
+}
+
+func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
+       for {
+               select {
+               case cm, ok := <-c.messageCh:
+                       if !ok {
+                               return nil, ErrConsumerClosed
+                       }
+                       return cm.Message, nil
+               case <-ctx.Done():
+                       return nil, ctx.Err()
+               }
+       }
+}
+
+// Messages
+func (c *consumer) Chan() <-chan ConsumerMessage {
+       return c.messageCh
+}
+
+// Ack the consumption of a single message
+func (c *consumer) Ack(msg Message) error {
+       return c.AckID(msg.ID())
+}
+
+// Ack the consumption of a single message, identified by its MessageID
+func (c *consumer) AckID(msgID MessageID) error {
+       mid, ok := msgID.(*messageID)
+       if !ok {
+               return fmt.Errorf("invalid message id type")
+       }
+
+       partition := mid.partitionIdx
+       // did we receive a valid partition index?
+       if partition < 0 || partition >= len(c.consumers) {
+               return fmt.Errorf("invalid partition index %d expected a 
partition between [0-%d]",
+                       partition, len(c.consumers))
+       }
+       return c.consumers[partition].AckID(msgID)
+}
+
+func (c *consumer) Close() error {
+       var wg sync.WaitGroup
+       for i := range c.consumers {
+               wg.Add(1)
+               go func(pc *partitionConsumer) {
+                       defer wg.Done()
+                       pc.Close()
+               }(c.consumers[i])
+       }
+       wg.Wait()
+
+       return nil
+}
+
+var random = rand.New(rand.NewSource(time.Now().UnixNano()))
+
+func generateRandomName() string {
+       chars := "abcdefghijklmnopqrstuvwxyz"
+       bytes := make([]byte, 5)
+       for i := range bytes {
+               bytes[i] = chars[random.Intn(len(chars))]
+       }
+       return string(bytes)
+}
+
+func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
+       switch st {
+       case Exclusive:
+               return pb.CommandSubscribe_Exclusive
+       case Shared:
+               return pb.CommandSubscribe_Shared
+       case Failover:
+               return pb.CommandSubscribe_Failover
+       case KeyShared:
+               return pb.CommandSubscribe_Key_Shared
+       }
+
+       return pb.CommandSubscribe_Exclusive
+}
+
+func toProtoInitialPosition(p SubscriptionInitialPosition) 
pb.CommandSubscribe_InitialPosition {
+       switch p {
+       case SubscriptionPositionLatest:
+               return pb.CommandSubscribe_Latest
+       case SubscriptionPositionEarliest:
+               return pb.CommandSubscribe_Earliest
+       }
+
+       return pb.CommandSubscribe_Latest
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
new file mode 100644
index 0000000..ae4889b
--- /dev/null
+++ b/pulsar/consumer_partition.go
@@ -0,0 +1,510 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/golang/protobuf/proto"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+type consumerState int
+
+const (
+       consumerInit consumerState = iota
+       consumerReady
+       consumerClosing
+       consumerClosed
+)
+
+type partitionConsumerOpts struct {
+       topic               string
+       consumerName        string
+       subscription        string
+       subscriptionType    SubscriptionType
+       subscriptionInitPos SubscriptionInitialPosition
+       partitionIdx        int
+       receiverQueueSize   int
+}
+
+type partitionConsumer struct {
+       client *client
+
+       // this is needed for sending ConsumerMessage on the messageCh
+       parentConsumer Consumer
+       state          consumerState
+       options        *partitionConsumerOpts
+
+       conn internal.Connection
+
+       topic        string
+       name         string
+       consumerID   uint64
+       partitionIdx int
+
+       // shared channel
+       messageCh chan ConsumerMessage
+
+       // the number of message slots available
+       availablePermits int32
+
+       // the size of the queue channel for buffering messages
+       queueSize int32
+       queueCh   chan []*message
+
+       eventsCh    chan interface{}
+       connectedCh chan struct{}
+       closeCh     chan struct{}
+
+       log *log.Entry
+}
+
+func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
+       messageCh chan ConsumerMessage) (*partitionConsumer, error) {
+       pc := &partitionConsumer{
+               state:          consumerInit,
+               parentConsumer: parent,
+               client:         client,
+               options:        options,
+               topic:          options.topic,
+               name:           options.consumerName,
+               consumerID:     client.rpcClient.NewConsumerID(),
+               partitionIdx:   options.partitionIdx,
+               eventsCh:       make(chan interface{}, 3),
+               queueSize:      int32(options.receiverQueueSize),
+               queueCh:        make(chan []*message, 
options.receiverQueueSize),
+               connectedCh:    make(chan struct{}),
+               messageCh:      messageCh,
+               closeCh:        make(chan struct{}),
+               log:            log.WithField("topic", options.topic),
+       }
+       pc.log = pc.log.WithField("name", pc.name).WithField("subscription", 
options.subscription)
+
+       err := pc.grabConn()
+       if err != nil {
+               log.WithError(err).Errorf("Failed to create consumer")
+               return nil, err
+       }
+       pc.log.Info("Created consumer")
+       pc.state = consumerReady
+
+       go pc.dispatcher()
+
+       go pc.runEventsLoop()
+
+       return pc, nil
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+       req := &unsubscribeRequest{doneCh: make(chan struct{})}
+       pc.eventsCh <- req
+
+       // wait for the request to complete
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
+       defer close(unsub.doneCh)
+
+       requestID := pc.client.rpcClient.NewRequestID()
+       cmdUnsubscribe := &pb.CommandUnsubscribe{
+               RequestId:  proto.Uint64(requestID),
+               ConsumerId: proto.Uint64(pc.consumerID),
+       }
+       _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+               unsub.err = err
+       }
+
+       pc.conn.DeleteConsumeHandler(pc.consumerID)
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+       return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+       req := &ackRequest{
+               doneCh: make(chan struct{}),
+               msgID:  msgID,
+       }
+       pc.eventsCh <- req
+
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) Close() error {
+       if pc.state != consumerReady {
+               return nil
+       }
+
+       req := &closeRequest{doneCh: make(chan struct{})}
+       pc.eventsCh <- req
+
+       // wait for request to finish
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalAck(req *ackRequest) {
+       defer close(req.doneCh)
+
+       id := &pb.MessageIdData{}
+       messageIDs := make([]*pb.MessageIdData, 0)
+       err := proto.Unmarshal(req.msgID.Serialize(), id)
+       if err != nil {
+               pc.log.WithError(err).Error("unable to serialize message id")
+               req.err = err
+       }
+
+       messageIDs = append(messageIDs, id)
+       requestID := internal.RequestIDNoResponse
+       cmdAck := &pb.CommandAck{
+               ConsumerId: proto.Uint64(pc.consumerID),
+               MessageId:  messageIDs,
+               AckType:    pb.CommandAck_Individual.Enum(),
+       }
+       _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_ACK, cmdAck)
+       if err != nil {
+               pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
+               req.err = err
+       }
+}
+
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
+       pbMsgID := response.GetMessageId()
+
+       reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+       msgMeta, err := reader.ReadMessageMetadata()
+       if err != nil {
+               // TODO send discardCorruptedMessage
+               return err
+       }
+
+       numMsgs := 1
+       if msgMeta.NumMessagesInBatch != nil {
+               numMsgs = int(msgMeta.GetNumMessagesInBatch())
+       }
+       messages := make([]*message, numMsgs)
+       for i := 0; i < numMsgs; i++ {
+               smm, payload, err := reader.ReadMessage()
+               if err != nil {
+                       // TODO send corrupted message
+                       return err
+               }
+
+               msgID := newMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+               var msg *message
+               if smm != nil {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(smm.GetEventTime()),
+                               key:         smm.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(smm.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
+                       }
+               } else {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                               key:         msgMeta.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
+                       }
+               }
+
+               messages[i] = msg
+       }
+
+       // send messages to the dispatcher
+       pc.queueCh <- messages
+       return nil
+}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+       // Trigger reconnection in the consumer goroutine
+       pc.eventsCh <- &connectionClosed{}
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accumulate these 
messages
+// before the application is ready to consume them. After the consumer is 
ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+       if permits == 0 {
+               return fmt.Errorf("invalid number of permits requested: %d", 
permits)
+       }
+
+       requestID := internal.RequestIDNoResponse
+       cmdFlow := &pb.CommandFlow{
+               ConsumerId:     proto.Uint64(pc.consumerID),
+               MessagePermits: proto.Uint32(permits),
+       }
+       _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_FLOW, cmdFlow)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// dispatcher manages the internal message queue channel
+// and manages the flow control
+func (pc *partitionConsumer) dispatcher() {
+       defer func() {
+               pc.log.Info("exiting dispatch loop")
+       }()
+       var messages []*message
+       for {
+               var queueCh chan []*message
+               var messageCh chan ConsumerMessage
+               var nextMessage ConsumerMessage
+
+               // are there more messages to send?
+               if len(messages) > 0 {
+                       nextMessage = ConsumerMessage{
+                               Consumer: pc.parentConsumer,
+                               Message:  messages[0],
+                       }
+                       messageCh = pc.messageCh
+               } else {
+                       // we are ready for more messages
+                       queueCh = pc.queueCh
+               }
+
+               select {
+               case <-pc.closeCh:
+                       return
+
+               case _, ok := <-pc.connectedCh:
+                       if !ok {
+                               return
+                       }
+                       pc.log.Debug("dispatcher received connection event")
+
+                       // drain messages
+                       messages = nil
+
+                       // drain the message queue on any new connection by 
sending a
+                       // special nil message to the channel so we know when 
to stop dropping messages
+                       go func() {
+                               pc.queueCh <- nil
+                       }()
+                       for m := range pc.queueCh {
+                               // the queue has been drained
+                               if m == nil {
+                                       break
+                               }
+                       }
+
+                       // reset available permits
+                       pc.availablePermits = 0
+                       initialPermits := uint32(pc.queueSize)
+
+                       pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
+                       // send initial permits
+                       if err := pc.internalFlow(initialPermits); err != nil {
+                               pc.log.WithError(err).Error("unable to send 
initial permits to broker")
+                       }
+
+               case msgs, ok := <-queueCh:
+                       if !ok {
+                               return
+                       }
+                       // we only read messages here after the consumer has 
processed all messages
+                       // in the previous batch
+                       messages = msgs
+
+               // if the messageCh is nil or the messageCh is full this will 
not be selected
+               case messageCh <- nextMessage:
+                       // allow this message to be garbage collected
+                       messages[0] = nil
+                       messages = messages[1:]
+
+                       // TODO implement a better flow controller
+                       // send more permits if needed
+                       pc.availablePermits++
+                       flowThreshold := 
int32(math.Max(float64(pc.queueSize/2), 1))
+                       if pc.availablePermits >= flowThreshold {
+                               availablePermits := pc.availablePermits
+                               requestedPermits := availablePermits
+                               pc.availablePermits = 0
+
+                               pc.log.Debugf("requesting more permits=%d 
available=%d", requestedPermits, availablePermits)
+                               if err := 
pc.internalFlow(uint32(requestedPermits)); err != nil {
+                                       pc.log.WithError(err).Error("unable to 
send permits")
+                               }
+                       }
+               }
+       }
+}
+
+type ackRequest struct {
+       doneCh chan struct{}
+       msgID  MessageID
+       err    error
+}
+
+type unsubscribeRequest struct {
+       doneCh chan struct{}
+       err    error
+}
+
+type closeRequest struct {
+       doneCh chan struct{}
+       err    error
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+       defer func() {
+               pc.log.Info("exiting events loop")
+       }()
+       for {
+               select {
+               case <-pc.closeCh:
+                       return
+               case i := <-pc.eventsCh:
+                       switch v := i.(type) {
+                       case *ackRequest:
+                               pc.internalAck(v)
+                       case *unsubscribeRequest:
+                               pc.internalUnsubscribe(v)
+                       case *connectionClosed:
+                               pc.reconnectToBroker()
+                       case *closeRequest:
+                               pc.internalClose(v)
+                               return
+                       }
+               }
+       }
+}
+
+func (pc *partitionConsumer) internalClose(req *closeRequest) {
+       defer close(req.doneCh)
+       if pc.state != consumerReady {
+               return
+       }
+
+       pc.state = consumerClosing
+       pc.log.Infof("Closing consumer=%d", pc.consumerID)
+
+       requestID := pc.client.rpcClient.NewConsumerID()
+       cmdClose := &pb.CommandCloseConsumer{
+               ConsumerId: proto.Uint64(pc.consumerID),
+               RequestId:  proto.Uint64(requestID),
+       }
+       _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_CLOSE_CONSUMER, cmdClose)
+       if err != nil {
+               req.err = err
+       } else {
+               pc.log.Info("Closed consumer")
+               pc.state = consumerClosed
+               pc.conn.DeleteConsumeHandler(pc.consumerID)
+               close(pc.closeCh)
+       }
+}
+
+func (pc *partitionConsumer) reconnectToBroker() {
+       backoff := internal.Backoff{}
+       for {
+               if pc.state != consumerReady {
+                       // Consumer is already closing
+                       return
+               }
+
+               d := backoff.Next()
+               pc.log.Info("Reconnecting to broker in ", d)
+               time.Sleep(d)
+
+               err := pc.grabConn()
+               if err == nil {
+                       // Successfully reconnected
+                       pc.log.Info("Reconnected consumer to broker")
+                       return
+               }
+       }
+}
+
+func (pc *partitionConsumer) grabConn() error {
+       lr, err := pc.client.lookupService.Lookup(pc.topic)
+       if err != nil {
+               pc.log.WithError(err).Warn("Failed to lookup topic")
+               return err
+       }
+       pc.log.Debugf("Lookup result: %+v", lr)
+
+       subType := toProtoSubType(pc.options.subscriptionType)
+       initialPosition := 
toProtoInitialPosition(pc.options.subscriptionInitPos)
+       requestID := pc.client.rpcClient.NewRequestID()
+       cmdSubscribe := &pb.CommandSubscribe{
+               RequestId:       proto.Uint64(requestID),
+               Topic:           proto.String(pc.topic),
+               SubType:         subType.Enum(),
+               Subscription:    proto.String(pc.options.subscription),
+               ConsumerId:      proto.Uint64(pc.consumerID),
+               ConsumerName:    proto.String(pc.name),
+               InitialPosition: initialPosition.Enum(),
+               Schema:          nil,
+       }
+
+       res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
+               pb.BaseCommand_SUBSCRIBE, cmdSubscribe)
+
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to create consumer")
+               return err
+       }
+
+       if res.Response.ConsumerStatsResponse != nil {
+               pc.name = res.Response.ConsumerStatsResponse.GetConsumerName()
+       }
+
+       pc.conn = res.Cnx
+       pc.log.Info("Connected consumer")
+       pc.conn.AddConsumeHandler(pc.consumerID, pc)
+
+       msgType := res.Response.GetType()
+
+       switch msgType {
+       case pb.BaseCommand_SUCCESS:
+               // notify the dispatcher we have connection
+               go func() {
+                       pc.connectedCh <- struct{}{}
+               }()
+               return nil
+       case pb.BaseCommand_ERROR:
+               errMsg := res.Response.GetError()
+               return fmt.Errorf("%s: %s", errMsg.GetError().String(), 
errMsg.GetMessage())
+       default:
+               return newUnexpectedErrMsg(msgType, requestID)
+       }
+}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index e6f4d52..bbc1315 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -22,11 +22,9 @@ import (
        "fmt"
        "log"
        "net/http"
-       "strings"
        "testing"
        "time"
 
-       "github.com/apache/pulsar-client-go/util"
        "github.com/stretchr/testify/assert"
 )
 
@@ -96,11 +94,6 @@ func TestProducerConsumer(t *testing.T) {
                        log.Fatal(err)
                }
        }
-
-       // unsubscribe consumer
-       if err := consumer.Unsubscribe(); err != nil {
-               log.Fatal(err)
-       }
 }
 
 func TestConsumerConnectError(t *testing.T) {
@@ -176,13 +169,6 @@ func TestBatchMessageReceive(t *testing.T) {
                count++
        }
 
-       // check strategically
-       for i := 0; i < 3; i++ {
-               if count == numOfMessages {
-                       break
-               }
-               time.Sleep(time.Second)
-       }
        assert.Equal(t, count, numOfMessages)
 }
 
@@ -202,7 +188,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
                Topic: "my-topic",
        })
 
-       // Expect error in creating cosnumer
+       // Expect error in creating consumer
        assert.Nil(t, consumer)
        assert.NotNil(t, err)
 
@@ -220,7 +206,7 @@ func TestConsumerWithInvalidConf(t *testing.T) {
        assert.Equal(t, err.(*Error).Result(), TopicNotFound)
 }
 
-func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
+func TestConsumerSubscriptionEarliestPosition(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -238,9 +224,8 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
        assert.Nil(t, err)
        defer producer.Close()
 
-       //sent message
+       // send message
        ctx := context.Background()
-
        err = producer.Send(ctx, &ProducerMessage{
                Payload: []byte("msg-1-content-1"),
        })
@@ -253,9 +238,9 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
 
        // create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:               topicName,
-               SubscriptionName:    subName,
-               SubscriptionInitPos: Earliest,
+               Topic:                       topicName,
+               SubscriptionName:            subName,
+               SubscriptionInitialPosition: SubscriptionPositionEarliest,
        })
        assert.Nil(t, err)
        defer consumer.Close()
@@ -266,24 +251,6 @@ func TestConsumer_SubscriptionEarliestPos(t *testing.T) {
        assert.Equal(t, "msg-1-content-1", string(msg.Payload()))
 }
 
-func makeHTTPCall(t *testing.T, method string, urls string, body string) {
-       client := http.Client{}
-
-       req, err := http.NewRequest(method, urls, strings.NewReader(body))
-       if err != nil {
-               t.Fatal(err)
-       }
-
-       req.Header.Set("Content-Type", "application/json")
-       req.Header.Set("Accept", "application/json")
-
-       res, err := client.Do(req)
-       if err != nil {
-               t.Fatal(err)
-       }
-       defer res.Body.Close()
-}
-
 func TestConsumerKeyShared(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
@@ -311,7 +278,8 @@ func TestConsumerKeyShared(t *testing.T) {
 
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
-               Topic: topic,
+               Topic:           topic,
+               DisableBatching: true,
        })
        assert.Nil(t, err)
        defer producer.Close()
@@ -325,33 +293,32 @@ func TestConsumerKeyShared(t *testing.T) {
                assert.Nil(t, err)
        }
 
-       time.Sleep(time.Second * 1)
-
-       go func() {
-               for i := 0; i < 10; i++ {
-                       msg, err := consumer1.Receive(ctx)
-                       assert.Nil(t, err)
-                       if msg != nil {
-                               fmt.Printf("consumer1 key is: %s, value is: 
%s\n", msg.Key(), string(msg.Payload()))
-                               err = consumer1.Ack(msg)
-                               assert.Nil(t, err)
+       receivedConsumer1 := 0
+       receivedConsumer2 := 0
+       for (receivedConsumer1 + receivedConsumer2) < 10 {
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
                        }
-               }
-       }()
-
-       go func() {
-               for i := 0; i < 10; i++ {
-                       msg2, err := consumer2.Receive(ctx)
-                       assert.Nil(t, err)
-                       if msg2 != nil {
-                               fmt.Printf("consumer2 key is:%s, value is: 
%s\n", msg2.Key(), string(msg2.Payload()))
-                               err = consumer2.Ack(msg2)
-                               assert.Nil(t, err)
+                       receivedConsumer1++
+                       if err := consumer1.Ack(cm.Message); err != nil {
+                               log.Fatal(err)
+                       }
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       receivedConsumer2++
+                       if err := consumer2.Ack(cm.Message); err != nil {
+                               log.Fatal(err)
                        }
                }
-       }()
+       }
 
-       time.Sleep(time.Second * 1)
+       fmt.Printf("TestConsumerKeyShared received messages consumer1: %d 
consumser2: %d\n",
+               receivedConsumer1, receivedConsumer2)
+       assert.Equal(t, 10, receivedConsumer1+receivedConsumer2)
 }
 
 func TestPartitionTopicsConsumerPubSub(t *testing.T) {
@@ -414,77 +381,16 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        assert.Equal(t, len(msgs), 10)
 }
 
-func TestConsumer_ReceiveAsync(t *testing.T) {
+func TestConsumerReceiveTimeout(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
-
        assert.Nil(t, err)
        defer client.Close()
 
-       topicName := "persistent://public/default/receive-async"
-       subName := "subscription-receive-async"
-       ctx := context.Background()
-       ch := make(chan ConsumerMessage, 10)
-
-       // create producer
-       producer, err := client.CreateProducer(ProducerOptions{
-               Topic: topicName,
-       })
-       assert.Nil(t, err)
-       defer producer.Close()
-
-       consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:            topicName,
-               SubscriptionName: subName,
-       })
-       assert.Nil(t, err)
-       defer consumer.Close()
-
-       //send 10 messages
-       for i := 0; i < 10; i++ {
-               err = producer.Send(ctx, &ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               })
-               assert.Nil(t, err)
-       }
-
-       //receive async 10 messages
-       err = consumer.ReceiveAsync(ctx, ch)
-       assert.Nil(t, err)
-
-       payloadList := make([]string, 0, 10)
-
-RECEIVE:
-       for {
-               select {
-               case cMsg, ok := <-ch:
-                       if ok {
-                               fmt.Printf("receive message payload is:%s\n", 
string(cMsg.Payload()))
-                               assert.Equal(t, topicName, cMsg.Message.Topic())
-                               assert.Equal(t, topicName, 
cMsg.Consumer.Topic())
-                               payloadList = append(payloadList, 
string(cMsg.Message.Payload()))
-                               if len(payloadList) == 10 {
-                                       break RECEIVE
-                               }
-                       }
-                       continue RECEIVE
-               case <-ctx.Done():
-                       t.Error("context error.")
-                       return
-               }
-       }
-}
-
-func TestConsumerAckTimeout(t *testing.T) {
-       client, err := NewClient(ClientOptions{
-               URL: lookupURL,
-       })
-       assert.Nil(t, err)
-       defer client.Close()
-
-       topic := "test-ack-timeout-topic-1"
-       ctx := context.Background()
+       topic := "test-topic-with-no-messages"
+       ctx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
 
        // create consumer
        consumer, err := client.Subscribe(ConsumerOptions{
@@ -496,155 +402,12 @@ func TestConsumerAckTimeout(t *testing.T) {
        assert.Nil(t, err)
        defer consumer.Close()
 
-       // create consumer1
-       consumer1, err := client.Subscribe(ConsumerOptions{
-               Topic:            topic,
-               SubscriptionName: "my-sub2",
-               Type:             Shared,
-               AckTimeout:       5 * 1000,
-       })
-       assert.Nil(t, err)
-       defer consumer1.Close()
-
-       // create producer
-       producer, err := client.CreateProducer(ProducerOptions{
-               Topic:           topic,
-               DisableBatching: true,
-       })
-       assert.Nil(t, err)
-       defer producer.Close()
-
-       // send 10 messages
-       for i := 0; i < 10; i++ {
-               if err := producer.Send(ctx, &ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               }); err != nil {
-                       log.Fatal(err)
-               }
-       }
-
-       // consumer receive 10 messages
-       payloadList := make([]string, 0, 10)
-       for i := 0; i < 10; i++ {
-               msg, err := consumer.Receive(context.Background())
-               if err != nil {
-                       log.Fatal(err)
-               }
-               payloadList = append(payloadList, string(msg.Payload()))
-
-               // not ack message
-       }
-       assert.Equal(t, 10, len(payloadList))
-
-       // consumer1 receive 10 messages
-       for i := 0; i < 10; i++ {
-               msg, err := consumer1.Receive(context.Background())
-               if err != nil {
-                       log.Fatal(err)
-               }
-
-               payloadList = append(payloadList, string(msg.Payload()))
-
-               // ack half of the messages
-               if i%2 == 0 {
-                       err = consumer1.Ack(msg)
-                       assert.Nil(t, err)
-               }
-       }
-
-       // wait ack timeout
-       time.Sleep(6 * time.Second)
-
-       fmt.Println("start redeliver messages...")
-
-       payloadList = make([]string, 0, 10)
-       // consumer receive messages again
-       for i := 0; i < 10; i++ {
-               msg, err := consumer.Receive(context.Background())
-               if err != nil {
-                       log.Fatal(err)
-               }
-               payloadList = append(payloadList, string(msg.Payload()))
-
-               // ack message
-               if err := consumer.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
-       }
-       assert.Equal(t, 10, len(payloadList))
-
-       payloadList = make([]string, 0, 5)
-       // consumer1 receive messages again
-       go func() {
-               for i := 0; i < 10; i++ {
-                       msg, err := consumer1.Receive(context.Background())
-                       if err != nil {
-                               log.Fatal(err)
-                       }
-
-                       expectMsg := fmt.Sprintf("hello-%d", i)
-                       fmt.Printf("redeliver messages, payload is:%s\n", 
expectMsg)
-                       payloadList = append(payloadList, string(msg.Payload()))
-
-                       // ack message
-                       if err := consumer1.Ack(msg); err != nil {
-                               log.Fatal(err)
-                       }
-               }
-               assert.Equal(t, 5, len(payloadList))
-       }()
-
-       // sleep 2 seconds, wait gorutine receive messages.
-       time.Sleep(time.Second * 2)
-}
-
-func TestConsumer_ReceiveAsyncWithCallback(t *testing.T) {
-       client, err := NewClient(ClientOptions{
-               URL: lookupURL,
-       })
-
-       assert.Nil(t, err)
-       defer client.Close()
-
-       topicName := "persistent://public/default/receive-async-with-callback"
-       subName := "subscription-receive-async"
-       ctx := context.Background()
-
-       // create producer
-       producer, err := client.CreateProducer(ProducerOptions{
-               Topic: topicName,
-       })
-       assert.Nil(t, err)
-       defer producer.Close()
-
-       consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:            topicName,
-               SubscriptionName: subName,
-       })
-       assert.Nil(t, err)
-       defer consumer.Close()
-
-       //send 10 messages
-       for i := 0; i < 10; i++ {
-               err := producer.Send(ctx, &ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
-               })
-               assert.Nil(t, err)
-       }
-
-       for i := 0; i < 10; i++ {
-               tmpMsg := fmt.Sprintf("hello-%d", i)
-               consumer.ReceiveAsyncWithCallback(ctx, func(msg Message, err 
error) {
-                       if err != nil {
-                               log.Fatal(err)
-                       }
-                       fmt.Printf("receive message payload is:%s\n", 
string(msg.Payload()))
-                       assert.Equal(t, tmpMsg, string(msg.Payload()))
-               })
-       }
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, msg)
+       assert.NotNil(t, err)
 }
 
-func TestConsumer_Shared(t *testing.T) {
+func TestConsumerShared(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -682,116 +445,49 @@ func TestConsumer_Shared(t *testing.T) {
        assert.Nil(t, err)
        defer producer.Close()
 
-       // send 10 messages
+       // send 10 messages with unique payloads
        for i := 0; i < 10; i++ {
                if err := producer.Send(context.Background(), &ProducerMessage{
                        Payload: []byte(fmt.Sprintf("hello-%d", i)),
                }); err != nil {
                        log.Fatal(err)
                }
+               fmt.Println("sending message:", fmt.Sprintf("hello-%d", i))
        }
 
-       msgList := make([]string, 0, 5)
-       for i := 0; i < 5; i++ {
-               msg, err := consumer1.Receive(context.Background())
-               if err != nil {
-                       log.Fatal(err)
-               }
-               fmt.Printf("consumer1 msg id is: %v, value is: %s\n", msg.ID(), 
string(msg.Payload()))
-               msgList = append(msgList, string(msg.Payload()))
-               if err := consumer1.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
-       }
-
-       assert.Equal(t, 5, len(msgList))
-
-       for i := 0; i < 5; i++ {
-               msg, err := consumer2.Receive(context.Background())
-               if err != nil {
-                       log.Fatal(err)
-               }
-               if err := consumer2.Ack(msg); err != nil {
-                       log.Fatal(err)
-               }
-               fmt.Printf("consumer2 msg id is: %v, value is: %s\n", msg.ID(), 
string(msg.Payload()))
-               msgList = append(msgList, string(msg.Payload()))
-       }
-
-       assert.Equal(t, 10, len(msgList))
-       res := util.RemoveDuplicateElement(msgList)
-       assert.Equal(t, 10, len(res))
-}
-
-func TestConsumer_Seek(t *testing.T) {
-       client, err := NewClient(ClientOptions{
-               URL: lookupURL,
-       })
-
-       assert.Nil(t, err)
-       defer client.Close()
-
-       topicName := "persistent://public/default/testSeek"
-       testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testSeek"
-       makeHTTPCall(t, http.MethodPut, testURL, "1")
-       subName := "sub-testSeek"
-
-       producer, err := client.CreateProducer(ProducerOptions{
-               Topic: topicName,
-       })
-       assert.Nil(t, err)
-       assert.Equal(t, producer.Topic(), topicName)
-       defer producer.Close()
-
-       consumer, err := client.Subscribe(ConsumerOptions{
-               Topic:            topicName,
-               SubscriptionName: subName,
-       })
-       assert.Nil(t, err)
-       assert.Equal(t, consumer.Topic(), topicName)
-       assert.Equal(t, consumer.Subscription(), subName)
-       defer consumer.Close()
-
-       ctx := context.Background()
-
-       // Send 10 messages synchronously
-       t.Log("Publishing 10 messages synchronously")
-       for msgNum := 0; msgNum < 10; msgNum++ {
-               if err := producer.Send(ctx, &ProducerMessage{
-                       Payload: []byte(fmt.Sprintf("msg-content-%d", msgNum)),
-               }); err != nil {
-                       t.Fatal(err)
-               }
-       }
-
-       t.Log("Trying to receive 10 messages")
-       idList := make([]MessageID, 0, 10)
-       for msgNum := 0; msgNum < 10; msgNum++ {
-               msg, err := consumer.Receive(ctx)
-               assert.Nil(t, err)
-               idList = append(idList, msg.ID())
-               fmt.Println(string(msg.Payload()))
-       }
-
-       for index, id := range idList {
-               if index == 4 {
-                       // seek to fourth message, expected receive fourth 
message.
-                       err = consumer.Seek(id)
-                       assert.Nil(t, err)
-                       break
+       readMsgs := 0
+       messages := make(map[string]struct{})
+       for readMsgs < 10 {
+               select {
+               case cm, ok := <-consumer1.Chan():
+                       if !ok {
+                               break
+                       }
+                       readMsgs++
+                       payload := string(cm.Message.Payload())
+                       messages[payload] = struct{}{}
+                       fmt.Printf("consumer1 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
+                       if err := consumer1.Ack(cm.Message); err != nil {
+                               log.Fatal(err)
+                       }
+               case cm, ok := <-consumer2.Chan():
+                       if !ok {
+                               break
+                       }
+                       readMsgs++
+                       payload := string(cm.Message.Payload())
+                       messages[payload] = struct{}{}
+                       fmt.Printf("consumer2 msg id is: %v, value is: %s\n", 
cm.Message.ID(), payload)
+                       if err := consumer2.Ack(cm.Message); err != nil {
+                               log.Fatal(err)
+                       }
                }
        }
 
-       // Sleeping for 500ms to wait for consumer re-connect
-       time.Sleep(500 * time.Millisecond)
-
-       msg, err := consumer.Receive(ctx)
-       assert.Nil(t, err)
-       t.Logf("again received message:%+v", msg.ID())
-       assert.Equal(t, "msg-content-4", string(msg.Payload()))
+       assert.Equal(t, 10, len(messages))
 }
 
-func TestConsumer_EventTime(t *testing.T) {
+func TestConsumerEventTime(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
@@ -828,7 +524,7 @@ func TestConsumer_EventTime(t *testing.T) {
        assert.Equal(t, "test", string(msg.Payload()))
 }
 
-func TestConsumer_Flow(t *testing.T) {
+func TestConsumerFlow(t *testing.T) {
        client, err := NewClient(ClientOptions{
                URL: lookupURL,
        })
diff --git a/util/error.go b/pulsar/helper.go
similarity index 84%
rename from util/error.go
rename to pulsar/helper.go
index 755b7c0..83975f4 100644
--- a/util/error.go
+++ b/pulsar/helper.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package util
+package pulsar
 
 import (
        "fmt"
@@ -26,24 +26,24 @@ import (
 // NewUnexpectedErrMsg instantiates an ErrUnexpectedMsg error.
 // Optionally provide a list of IDs associated with the message
 // for additional context in the error message.
-func NewUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) 
*UnexpectedErrMsg {
-       return &UnexpectedErrMsg{
+func newUnexpectedErrMsg(msgType pb.BaseCommand_Type, ids ...interface{}) 
*unexpectedErrMsg {
+       return &unexpectedErrMsg{
                msgType: msgType,
                ids:     ids,
        }
 }
 
 // UnexpectedErrMsg is returned when an unexpected message is received.
-type UnexpectedErrMsg struct {
+type unexpectedErrMsg struct {
        msgType pb.BaseCommand_Type
        ids     []interface{}
 }
 
 // Error satisfies the error interface.
-func (e *UnexpectedErrMsg) Error() string {
+func (e *unexpectedErrMsg) Error() string {
        msg := fmt.Sprintf("received unexpected message of type %q", 
e.msgType.String())
        for _, id := range e.ids {
-               msg += fmt.Sprintf(" id=%v", id)
+               msg += fmt.Sprintf(" consumerID=%v", id)
        }
        return msg
 }
diff --git a/pulsar/impl_client.go b/pulsar/impl_client.go
index 7077155..53c2074 100644
--- a/pulsar/impl_client.go
+++ b/pulsar/impl_client.go
@@ -21,12 +21,13 @@ import (
        "fmt"
        "net/url"
 
-       "github.com/apache/pulsar-client-go/pkg/auth"
-       "github.com/apache/pulsar-client-go/pkg/pb"
-       "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/pkg/errors"
 
        log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/auth"
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 type client struct {
@@ -97,7 +98,7 @@ func (client *client) CreateProducer(options ProducerOptions) 
(Producer, error)
 }
 
 func (client *client) Subscribe(options ConsumerOptions) (Consumer, error) {
-       consumer, err := newConsumer(client, &options)
+       consumer, err := newConsumer(client, options)
        if err != nil {
                return nil, err
        }
diff --git a/pulsar/impl_consumer.go b/pulsar/impl_consumer.go
deleted file mode 100644
index 28535b5..0000000
--- a/pulsar/impl_consumer.go
+++ /dev/null
@@ -1,316 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-       "context"
-       "errors"
-       "fmt"
-
-       "github.com/apache/pulsar-client-go/pkg/pb"
-       "github.com/apache/pulsar-client-go/util"
-       "github.com/golang/protobuf/proto"
-
-       log "github.com/sirupsen/logrus"
-)
-
-type consumer struct {
-       topicName       string
-       consumers       []Consumer
-       log             *log.Entry
-       queue           chan ConsumerMessage
-       unackMsgTracker *UnackedMessageTracker
-}
-
-func newConsumer(client *client, options *ConsumerOptions) (*consumer, error) {
-       if options == nil {
-               return nil, newError(ResultInvalidConfiguration, "consumer 
configuration undefined")
-       }
-
-       if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
-               return nil, newError(TopicNotFound, "topic is required")
-       }
-
-       if options.SubscriptionName == "" {
-               return nil, newError(SubscriptionNotFound, "subscription name 
is required for consumer")
-       }
-
-       if options.ReceiverQueueSize == 0 {
-               options.ReceiverQueueSize = 1000
-       }
-
-       if options.TopicsPattern != "" {
-               if options.Topics != nil {
-                       return nil, newError(ResultInvalidConfiguration, "Topic 
names list must be null when use topicsPattern")
-               }
-               // TODO: impl logic
-       } else if options.Topics != nil && len(options.Topics) > 1 {
-               // TODO: impl logic
-       } else if options.Topics != nil && len(options.Topics) == 1 || 
options.Topic != "" {
-               var singleTopicName string
-               if options.Topic != "" {
-                       singleTopicName = options.Topic
-               } else {
-                       singleTopicName = options.Topics[0]
-               }
-               return singleTopicSubscribe(client, options, singleTopicName)
-       }
-
-       return nil, newError(ResultInvalidTopicName, "topic name is required 
for consumer")
-}
-
-func singleTopicSubscribe(client *client, options *ConsumerOptions, topic 
string) (*consumer, error) {
-       c := &consumer{
-               topicName: topic,
-               log:       log.WithField("topic", topic),
-               queue:     make(chan ConsumerMessage, 
options.ReceiverQueueSize),
-       }
-
-       partitions, err := client.TopicPartitions(topic)
-       if err != nil {
-               return nil, err
-       }
-
-       numPartitions := len(partitions)
-       c.consumers = make([]Consumer, numPartitions)
-
-       type ConsumerError struct {
-               err       error
-               partition int
-               cons      Consumer
-       }
-
-       ch := make(chan ConsumerError, numPartitions)
-
-       for partitionIdx, partitionTopic := range partitions {
-               // this needs to be created outside in the same go routine since
-               // newPartitionConsumer can modify the shared options struct 
causing a race condition
-               cons, err := newPartitionConsumer(client, partitionTopic, 
options, partitionIdx, numPartitions, c.queue)
-               go func(partitionIdx int, partitionTopic string) {
-                       ch <- ConsumerError{
-                               err:       err,
-                               partition: partitionIdx,
-                               cons:      cons,
-                       }
-               }(partitionIdx, partitionTopic)
-       }
-
-       for i := 0; i < numPartitions; i++ {
-               ce, ok := <-ch
-               if ok {
-                       err = ce.err
-                       c.consumers[ce.partition] = ce.cons
-               }
-       }
-
-       if err != nil {
-               // Since there were some failures, cleanup all the partitions 
that succeeded in creating the consumers
-               for _, consumer := range c.consumers {
-                       if !util.IsNil(consumer) {
-                               if err = consumer.Close(); err != nil {
-                                       panic("close consumer error, please 
check.")
-                               }
-                       }
-               }
-               return nil, err
-       }
-
-       return c, nil
-}
-
-func (c *consumer) Topic() string {
-       return c.topicName
-}
-
-func (c *consumer) Subscription() string {
-       return c.consumers[0].Subscription()
-}
-
-func (c *consumer) Unsubscribe() error {
-       var errMsg string
-       for _, consumer := range c.consumers {
-               if err := consumer.Unsubscribe(); err != nil {
-                       errMsg += fmt.Sprintf("topic %s, subscription %s: %s", 
c.Topic(), c.Subscription(), err)
-               }
-       }
-       if errMsg != "" {
-               return errors.New(errMsg)
-       }
-       return nil
-}
-
-func (c *consumer) getMessageFromSubConsumer(ctx context.Context) {
-       for _, pc := range c.consumers {
-               go func(pc Consumer) {
-                       err := pc.ReceiveAsync(ctx, c.queue)
-                       if err != nil {
-                               return
-                       }
-               }(pc)
-       }
-}
-
-func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
-       if len(c.consumers) > 1 {
-               select {
-               case <-ctx.Done():
-                       return nil, ctx.Err()
-               case cMsg, ok := <-c.queue:
-                       if ok {
-                               return cMsg.Message, nil
-                       }
-                       return nil, errors.New("receive message error")
-               }
-       }
-
-       return c.consumers[0].(*partitionConsumer).Receive(ctx)
-}
-
-func (c *consumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
-       for _, pc := range c.consumers {
-               go func(pc Consumer) {
-                       if err := pc.ReceiveAsync(ctx, msgs); err != nil {
-                               c.log.Errorf("receive async messages error:%s, 
please check.", err.Error())
-                               return
-                       }
-               }(pc)
-       }
-
-       return nil
-}
-
-func (c *consumer) ReceiveAsyncWithCallback(ctx context.Context, callback 
func(msg Message, err error)) {
-       var err error
-       if len(c.consumers) > 1 {
-               select {
-               case <-ctx.Done():
-                       c.log.Errorf("ReceiveAsyncWithCallback: receive message 
error:%s", ctx.Err().Error())
-                       return
-               case cMsg, ok := <-c.queue:
-                       if ok {
-                               callback(cMsg.Message, err)
-                       }
-                       return
-               }
-       }
-       c.consumers[0].(*partitionConsumer).ReceiveAsyncWithCallback(ctx, 
callback)
-}
-
-//Ack the consumption of a single message
-func (c *consumer) Ack(msg Message) error {
-       return c.AckID(msg.ID())
-}
-
-// Ack the consumption of a single message, identified by its MessageID
-func (c *consumer) AckID(msgID MessageID) error {
-       id := &pb.MessageIdData{}
-       err := proto.Unmarshal(msgID.Serialize(), id)
-       if err != nil {
-               c.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               return err
-       }
-
-       partition := id.GetPartition()
-       if partition < 0 {
-               return c.consumers[0].AckID(msgID)
-       }
-       return c.consumers[partition].AckID(msgID)
-}
-
-func (c *consumer) AckCumulative(msg Message) error {
-       return c.AckCumulativeID(msg.ID())
-}
-
-func (c *consumer) AckCumulativeID(msgID MessageID) error {
-       id := &pb.MessageIdData{}
-       err := proto.Unmarshal(msgID.Serialize(), id)
-       if err != nil {
-               c.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               return err
-       }
-
-       partition := id.GetPartition()
-       if partition < 0 {
-               return errors.New("invalid partition index")
-       }
-       return c.consumers[partition].AckCumulativeID(msgID)
-}
-
-func (c *consumer) Close() error {
-       for _, pc := range c.consumers {
-               return pc.Close()
-       }
-       return nil
-}
-
-func (c *consumer) Seek(msgID MessageID) error {
-       id := &pb.MessageIdData{}
-       err := proto.Unmarshal(msgID.Serialize(), id)
-       if err != nil {
-               c.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               return err
-       }
-
-       partition := id.GetPartition()
-
-       // current topic is non-partition topic, we only need to get the first 
value in the consumers.
-       if partition < 0 {
-               partition = 0
-       }
-       return c.consumers[partition].Seek(msgID)
-}
-
-func (c *consumer) RedeliverUnackedMessages() error {
-       var errMsg string
-       for _, c := range c.consumers {
-               if err := c.RedeliverUnackedMessages(); err != nil {
-                       errMsg += fmt.Sprintf("topic %s, subscription %s: %s", 
c.Topic(), c.Subscription(), err)
-               }
-       }
-
-       if errMsg != "" {
-               return errors.New(errMsg)
-       }
-       return nil
-}
-
-func toProtoSubType(st SubscriptionType) pb.CommandSubscribe_SubType {
-       switch st {
-       case Exclusive:
-               return pb.CommandSubscribe_Exclusive
-       case Shared:
-               return pb.CommandSubscribe_Shared
-       case Failover:
-               return pb.CommandSubscribe_Failover
-       case KeyShared:
-               return pb.CommandSubscribe_Key_Shared
-       }
-
-       return pb.CommandSubscribe_Exclusive
-}
-
-func toProtoInitialPosition(p InitialPosition) 
pb.CommandSubscribe_InitialPosition {
-       switch p {
-       case Latest:
-               return pb.CommandSubscribe_Latest
-       case Earliest:
-               return pb.CommandSubscribe_Earliest
-       }
-
-       return pb.CommandSubscribe_Latest
-}
diff --git a/pulsar/impl_partition_consumer.go 
b/pulsar/impl_partition_consumer.go
deleted file mode 100644
index 83e09ad..0000000
--- a/pulsar/impl_partition_consumer.go
+++ /dev/null
@@ -1,739 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package pulsar
-
-import (
-       "context"
-       "fmt"
-       "math"
-       "sync"
-       "time"
-
-       "github.com/golang/protobuf/proto"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/apache/pulsar-client-go/pkg/pb"
-       "github.com/apache/pulsar-client-go/pulsar/internal"
-       "github.com/apache/pulsar-client-go/util"
-)
-
-const maxRedeliverUnacknowledged = 1000
-
-type consumerState int
-
-const (
-       consumerInit consumerState = iota
-       consumerReady
-       consumerClosing
-       consumerClosed
-)
-
-type partitionConsumer struct {
-       state  consumerState
-       client *client
-       topic  string
-       log    *log.Entry
-       cnx    internal.Connection
-
-       options      *ConsumerOptions
-       consumerName *string
-       consumerID   uint64
-       subQueue     chan ConsumerMessage
-
-       omu               sync.Mutex // protects following
-       redeliverMessages []*pb.MessageIdData
-
-       unAckTracker      *UnackedMessageTracker
-       receivedSinceFlow uint32
-
-       eventsChan   chan interface{}
-       partitionIdx int
-       partitionNum int
-}
-
-func newPartitionConsumer(client *client, topic string, options 
*ConsumerOptions, partitionID, partitionNum int, ch chan<- ConsumerMessage) 
(*partitionConsumer, error) {
-       c := &partitionConsumer{
-               state:             consumerInit,
-               client:            client,
-               topic:             topic,
-               options:           options,
-               log:               log.WithField("topic", topic),
-               consumerID:        client.rpcClient.NewConsumerID(),
-               partitionIdx:      partitionID,
-               partitionNum:      partitionNum,
-               eventsChan:        make(chan interface{}, 1),
-               subQueue:          make(chan ConsumerMessage, 
options.ReceiverQueueSize),
-               receivedSinceFlow: 0,
-       }
-
-       c.setDefault(options)
-
-       if options.MessageChannel == nil {
-               options.MessageChannel = make(chan ConsumerMessage, 
options.ReceiverQueueSize)
-       } else {
-               c.subQueue = options.MessageChannel
-       }
-
-       if options.Name != "" {
-               c.consumerName = &options.Name
-       }
-
-       if options.Type == Shared || options.Type == KeyShared {
-               if options.AckTimeout != 0 {
-                       c.unAckTracker = NewUnackedMessageTracker()
-                       c.unAckTracker.pcs = append(c.unAckTracker.pcs, c)
-                       c.unAckTracker.Start(int64(options.AckTimeout))
-               }
-       }
-
-       err := c.grabCnx()
-       if err != nil {
-               log.WithError(err).Errorf("Failed to create consumer")
-               return nil, err
-       }
-       c.log = c.log.WithField("name", c.consumerName)
-       c.log.Info("Created consumer")
-       c.state = consumerReady
-
-       // In here, open a gorutine to receive data asynchronously from the 
subConsumer,
-       // filling the queue channel of the current consumer.
-       if partitionNum > 1 {
-               go func() {
-                       err = c.ReceiveAsync(context.Background(), ch)
-                       if err != nil {
-                               return
-                       }
-               }()
-       }
-
-       go c.runEventsLoop()
-
-       return c, nil
-}
-
-func (pc *partitionConsumer) setDefault(options *ConsumerOptions) {
-       if options.ReceiverQueueSize <= 0 {
-               options.ReceiverQueueSize = 1000
-       }
-
-       if options.AckTimeout == 0 {
-               options.AckTimeout = time.Second * 30
-       }
-}
-
-func (pc *partitionConsumer) grabCnx() error {
-       lr, err := pc.client.lookupService.Lookup(pc.topic)
-       if err != nil {
-               pc.log.WithError(err).Warn("Failed to lookup topic")
-               return err
-       }
-       pc.log.Debugf("Lookup result: %v", lr)
-
-       subType := toProtoSubType(pc.options.Type)
-       initialPosition := 
toProtoInitialPosition(pc.options.SubscriptionInitPos)
-       requestID := pc.client.rpcClient.NewRequestID()
-       res, err := pc.client.rpcClient.Request(lr.LogicalAddr, 
lr.PhysicalAddr, requestID,
-               pb.BaseCommand_SUBSCRIBE, &pb.CommandSubscribe{
-                       RequestId:       proto.Uint64(requestID),
-                       Topic:           proto.String(pc.topic),
-                       SubType:         subType.Enum(),
-                       Subscription:    
proto.String(pc.options.SubscriptionName),
-                       ConsumerId:      proto.Uint64(pc.consumerID),
-                       ConsumerName:    proto.String(pc.options.Name),
-                       InitialPosition: initialPosition.Enum(),
-                       Schema:          nil,
-               })
-
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to create consumer")
-               return err
-       }
-
-       if res.Response.ConsumerStatsResponse != nil {
-               pc.consumerName = 
res.Response.ConsumerStatsResponse.ConsumerName
-       }
-
-       pc.cnx = res.Cnx
-       pc.log.WithField("cnx", res.Cnx).Debug("Connected consumer")
-       pc.cnx.AddConsumeHandler(pc.consumerID, pc)
-
-       msgType := res.Response.GetType()
-
-       switch msgType {
-       case pb.BaseCommand_SUCCESS:
-               return pc.internalFlow(uint32(pc.options.ReceiverQueueSize))
-       case pb.BaseCommand_ERROR:
-               errMsg := res.Response.GetError()
-               return fmt.Errorf("%s: %s", errMsg.GetError().String(), 
errMsg.GetMessage())
-       default:
-               return util.NewUnexpectedErrMsg(msgType, requestID)
-       }
-}
-
-func (pc *partitionConsumer) Topic() string {
-       return pc.topic
-}
-
-func (pc *partitionConsumer) Subscription() string {
-       return pc.options.SubscriptionName
-}
-
-func (pc *partitionConsumer) Unsubscribe() error {
-       wg := &sync.WaitGroup{}
-       wg.Add(1)
-
-       hu := &handleUnsubscribe{
-               waitGroup: wg,
-               err:       nil,
-       }
-       pc.eventsChan <- hu
-
-       wg.Wait()
-       return hu.err
-}
-
-func (pc *partitionConsumer) internalUnsubscribe(unsub *handleUnsubscribe) {
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err := pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-               pb.BaseCommand_UNSUBSCRIBE, &pb.CommandUnsubscribe{
-                       RequestId:  proto.Uint64(requestID),
-                       ConsumerId: proto.Uint64(pc.consumerID),
-               })
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-               unsub.err = err
-       }
-
-       pc.cnx.DeleteConsumeHandler(pc.consumerID)
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.Stop()
-       }
-
-       unsub.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) trackMessage(msgID MessageID) error {
-       id := &pb.MessageIdData{}
-       err := proto.Unmarshal(msgID.Serialize(), id)
-       if err != nil {
-               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               return err
-       }
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.Add(id)
-       }
-       return nil
-}
-
-func (pc *partitionConsumer) increaseAvailablePermits() error {
-       pc.receivedSinceFlow++
-       highWater := uint32(math.Max(float64(pc.options.ReceiverQueueSize/2), 
1))
-
-       pc.log.Debugf("receivedSinceFlow size is: %d, highWater size is: %d", 
pc.receivedSinceFlow, highWater)
-
-       // send flow request after 1/2 of the queue has been consumed
-       if pc.receivedSinceFlow >= highWater {
-               pc.log.Debugf("send flow command to broker, permits size is: 
%d", pc.receivedSinceFlow)
-               err := pc.internalFlow(pc.receivedSinceFlow)
-               if err != nil {
-                       pc.log.Errorf("Send flow cmd error:%s", err.Error())
-                       pc.receivedSinceFlow = 0
-                       return err
-               }
-               pc.receivedSinceFlow = 0
-       }
-       return nil
-}
-
-func (pc *partitionConsumer) messageProcessed(msgID MessageID) error {
-       err := pc.trackMessage(msgID)
-       if err != nil {
-               return err
-       }
-
-       err = pc.increaseAvailablePermits()
-       if err != nil {
-               return err
-       }
-
-       return nil
-}
-
-func (pc *partitionConsumer) Receive(ctx context.Context) (message Message, 
err error) {
-       wg := &sync.WaitGroup{}
-       wg.Add(1)
-       pc.ReceiveAsyncWithCallback(ctx, func(msg Message, e error) {
-               message = msg
-               err = e
-               wg.Done()
-       })
-       wg.Wait()
-
-       return message, err
-}
-
-func (pc *partitionConsumer) ReceiveAsync(ctx context.Context, msgs chan<- 
ConsumerMessage) error {
-       for {
-               select {
-               case tmpMsg, ok := <-pc.subQueue:
-                       if ok {
-                               msgs <- tmpMsg
-
-                               err := pc.messageProcessed(tmpMsg.ID())
-                               if err != nil {
-                                       return err
-                               }
-                               continue
-                       }
-                       break
-               case <-ctx.Done():
-                       return ctx.Err()
-               }
-       }
-}
-
-func (pc *partitionConsumer) ReceiveAsyncWithCallback(ctx context.Context, 
callback func(msg Message, err error)) {
-       var err error
-
-       select {
-       case tmpMsg, ok := <-pc.subQueue:
-               if ok {
-                       err = pc.messageProcessed(tmpMsg.ID())
-                       callback(tmpMsg.Message, err)
-                       if err != nil {
-                               pc.log.Errorf("processed messages error:%s", 
err.Error())
-                               return
-                       }
-               }
-       case <-ctx.Done():
-               pc.log.Errorf("context shouldn't done, please check error:%s", 
ctx.Err().Error())
-               return
-       }
-}
-
-func (pc *partitionConsumer) Ack(msg Message) error {
-       return pc.AckID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckID(msgID MessageID) error {
-       wg := &sync.WaitGroup{}
-       wg.Add(1)
-       ha := &handleAck{
-               msgID:     msgID,
-               waitGroup: wg,
-               err:       nil,
-       }
-       pc.eventsChan <- ha
-       wg.Wait()
-       return ha.err
-}
-
-func (pc *partitionConsumer) internalAck(ack *handleAck) {
-       id := &pb.MessageIdData{}
-       messageIDs := make([]*pb.MessageIdData, 0)
-       err := proto.Unmarshal(ack.msgID.Serialize(), id)
-       if err != nil {
-               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               ack.err = err
-       }
-
-       messageIDs = append(messageIDs, id)
-
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
-               pb.BaseCommand_ACK, &pb.CommandAck{
-                       ConsumerId: proto.Uint64(pc.consumerID),
-                       MessageId:  messageIDs,
-                       AckType:    pb.CommandAck_Individual.Enum(),
-               })
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-               ack.err = err
-       }
-
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.Remove(id)
-       }
-       ack.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) AckCumulative(msg Message) error {
-       return pc.AckCumulativeID(msg.ID())
-}
-
-func (pc *partitionConsumer) AckCumulativeID(msgID MessageID) error {
-       hac := &handleAckCumulative{
-               msgID: msgID,
-               err:   nil,
-       }
-       pc.eventsChan <- hac
-
-       return hac.err
-}
-
-func (pc *partitionConsumer) internalAckCumulative(ackCumulative 
*handleAckCumulative) {
-       id := &pb.MessageIdData{}
-       messageIDs := make([]*pb.MessageIdData, 0)
-       err := proto.Unmarshal(ackCumulative.msgID.Serialize(), id)
-       if err != nil {
-               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               ackCumulative.err = err
-       }
-       messageIDs = append(messageIDs, id)
-
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-               pb.BaseCommand_ACK, &pb.CommandAck{
-                       ConsumerId: proto.Uint64(pc.consumerID),
-                       MessageId:  messageIDs,
-                       AckType:    pb.CommandAck_Cumulative.Enum(),
-               })
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-               ackCumulative.err = err
-       }
-
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.Remove(id)
-       }
-}
-
-func (pc *partitionConsumer) Close() error {
-       if pc.state != consumerReady {
-               return nil
-       }
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.Stop()
-       }
-
-       wg := sync.WaitGroup{}
-       wg.Add(1)
-
-       cc := &handlerClose{&wg, nil}
-       pc.eventsChan <- cc
-
-       wg.Wait()
-       return cc.err
-}
-
-func (pc *partitionConsumer) Seek(msgID MessageID) error {
-       wg := &sync.WaitGroup{}
-       wg.Add(1)
-
-       hc := &handleSeek{
-               msgID:     msgID,
-               waitGroup: wg,
-               err:       nil,
-       }
-       pc.eventsChan <- hc
-
-       wg.Wait()
-       return hc.err
-}
-
-func (pc *partitionConsumer) internalSeek(seek *handleSeek) {
-       if pc.state == consumerClosing || pc.state == consumerClosed {
-               pc.log.Error("Consumer was already closed")
-               return
-       }
-
-       id := &pb.MessageIdData{}
-       err := proto.Unmarshal(seek.msgID.Serialize(), id)
-       if err != nil {
-               pc.log.WithError(err).Errorf("unserialize message id error:%s", 
err.Error())
-               seek.err = err
-       }
-
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err = pc.client.rpcClient.RequestOnCnx(pc.cnx, requestID,
-               pb.BaseCommand_SEEK, &pb.CommandSeek{
-                       ConsumerId: proto.Uint64(pc.consumerID),
-                       RequestId:  proto.Uint64(requestID),
-                       MessageId:  id,
-               })
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-               seek.err = err
-       }
-
-       seek.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) RedeliverUnackedMessages() error {
-       wg := &sync.WaitGroup{}
-       wg.Add(1)
-
-       hr := &handleRedeliver{
-               waitGroup: wg,
-               err:       nil,
-       }
-       pc.eventsChan <- hr
-       wg.Wait()
-       return hr.err
-}
-
-func (pc *partitionConsumer) internalRedeliver(redeliver *handleRedeliver) {
-       pc.omu.Lock()
-       defer pc.omu.Unlock()
-
-       redeliverMessagesSize := len(pc.redeliverMessages)
-
-       if redeliverMessagesSize == 0 {
-               return
-       }
-
-       requestID := pc.client.rpcClient.NewRequestID()
-
-       for i := 0; i < len(pc.redeliverMessages); i += 
maxRedeliverUnacknowledged {
-               end := i + maxRedeliverUnacknowledged
-               if end > redeliverMessagesSize {
-                       end = redeliverMessagesSize
-               }
-               _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, 
requestID,
-                       pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, 
&pb.CommandRedeliverUnacknowledgedMessages{
-                               ConsumerId: proto.Uint64(pc.consumerID),
-                               MessageIds: pc.redeliverMessages[i:end],
-                       })
-               if err != nil {
-                       pc.log.WithError(err).Error("Failed to unsubscribe 
consumer")
-                       redeliver.err = err
-               }
-       }
-
-       // clear redeliverMessages slice
-       pc.redeliverMessages = nil
-
-       if pc.unAckTracker != nil {
-               pc.unAckTracker.clear()
-       }
-       redeliver.waitGroup.Done()
-}
-
-func (pc *partitionConsumer) runEventsLoop() {
-       for {
-               select {
-               case i := <-pc.eventsChan:
-                       switch v := i.(type) {
-                       case *handlerClose:
-                               pc.internalClose(v)
-                               return
-                       case *handleSeek:
-                               pc.internalSeek(v)
-                       case *handleUnsubscribe:
-                               pc.internalUnsubscribe(v)
-                       case *handleAckCumulative:
-                               pc.internalAckCumulative(v)
-                       case *handleAck:
-                               pc.internalAck(v)
-                       case *handleRedeliver:
-                               pc.internalRedeliver(v)
-                       case *handleConnectionClosed:
-                               pc.reconnectToBroker()
-                       }
-               }
-       }
-}
-
-func (pc *partitionConsumer) internalClose(req *handlerClose) {
-       if pc.state != consumerReady {
-               req.waitGroup.Done()
-               return
-       }
-
-       pc.state = consumerClosing
-       pc.log.Info("Closing consumer")
-
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID, 
pb.BaseCommand_CLOSE_CONSUMER, &pb.CommandCloseConsumer{
-               ConsumerId: proto.Uint64(pc.consumerID),
-               RequestId:  proto.Uint64(requestID),
-       })
-       pc.cnx.DeleteConsumeHandler(pc.consumerID)
-
-       if err != nil {
-               req.err = err
-       } else {
-               pc.log.Info("Closed consumer")
-               pc.state = consumerClosed
-               close(pc.options.MessageChannel)
-       }
-
-       req.waitGroup.Done()
-}
-
-// Flow command gives additional permits to send messages to the consumer.
-// A typical consumer implementation will use a queue to accuMulate these 
messages
-// before the application is ready to consume them. After the consumer is 
ready,
-// the client needs to give permission to the broker to push messages.
-func (pc *partitionConsumer) internalFlow(permits uint32) error {
-       if permits <= 0 {
-               return fmt.Errorf("invalid number of permits requested: %d", 
permits)
-       }
-
-       requestID := pc.client.rpcClient.NewRequestID()
-       _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.cnx, requestID,
-               pb.BaseCommand_FLOW, &pb.CommandFlow{
-                       ConsumerId:     proto.Uint64(pc.consumerID),
-                       MessagePermits: proto.Uint32(permits),
-               })
-
-       if err != nil {
-               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
-               return err
-       }
-       return nil
-}
-
-func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
-       pbMsgID := response.GetMessageId()
-       reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
-       msgMeta, err := reader.ReadMessageMetadata()
-       if err != nil {
-               // TODO send discardCorruptedMessage
-               return err
-       }
-
-       numMsgs := 1
-       if msgMeta.NumMessagesInBatch != nil {
-               numMsgs = int(msgMeta.GetNumMessagesInBatch())
-       }
-       for i := 0; i < numMsgs; i++ {
-               ssm, payload, err := reader.ReadMessage()
-               if err != nil {
-                       // TODO send
-                       return err
-               }
-
-               msgID := newMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
-               var msg Message
-               if ssm == nil {
-                       msg = &message{
-                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-                               eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
-                               key:         msgMeta.GetPartitionKey(),
-                               properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
-                               topic:       pc.topic,
-                               msgID:       msgID,
-                               payLoad:     payload,
-                       }
-               } else {
-                       msg = &message{
-                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
-                               eventTime:   
timeFromUnixTimestampMillis(ssm.GetEventTime()),
-                               key:         ssm.GetPartitionKey(),
-                               properties:  
internal.ConvertToStringMap(ssm.GetProperties()),
-                               topic:       pc.topic,
-                               msgID:       msgID,
-                               payLoad:     payload,
-                       }
-               }
-
-               if err := pc.dispatchMessage(msg, pbMsgID); err != nil {
-                       // TODO handle error
-                       return err
-               }
-       }
-
-       return nil
-}
-
-
-func (pc *partitionConsumer) dispatchMessage(msg Message, msgID 
*pb.MessageIdData) error {
-       select {
-       case pc.subQueue <- ConsumerMessage{Consumer:pc, Message:msg}:
-               //Add messageId to redeliverMessages buffer, avoiding 
duplicates.
-               var dup bool
-
-               pc.omu.Lock()
-               for _, mid := range pc.redeliverMessages {
-                       if proto.Equal(mid, msgID) {
-                               dup = true
-                               break
-                       }
-               }
-
-               if !dup {
-                       pc.redeliverMessages = append(pc.redeliverMessages, 
msgID)
-               }
-               pc.omu.Unlock()
-       default:
-               return fmt.Errorf("consumer message channel on topic %s is full 
(capacity = %d)", pc.Topic(), cap(pc.options.MessageChannel))
-       }
-       return nil
-}
-
-type handleAck struct {
-       msgID     MessageID
-       waitGroup *sync.WaitGroup
-       err       error
-}
-
-type handleAckCumulative struct {
-       msgID MessageID
-       err   error
-}
-
-type handleUnsubscribe struct {
-       waitGroup *sync.WaitGroup
-       err       error
-}
-
-type handleSeek struct {
-       msgID     MessageID
-       waitGroup *sync.WaitGroup
-       err       error
-}
-
-type handleRedeliver struct {
-       waitGroup *sync.WaitGroup
-       err       error
-}
-
-type handlerClose struct {
-       waitGroup *sync.WaitGroup
-       err       error
-}
-
-type handleConnectionClosed struct{}
-
-func (pc *partitionConsumer) ConnectionClosed() {
-       // Trigger reconnection in the consumer goroutine
-       pc.eventsChan <- &handleConnectionClosed{}
-}
-
-func (pc *partitionConsumer) reconnectToBroker() {
-       backoff := internal.Backoff{}
-       for {
-               if pc.state != consumerReady {
-                       // Consumer is already closing
-                       return
-               }
-
-               d := backoff.Next()
-               pc.log.Info("Reconnecting to broker in ", d)
-               time.Sleep(d)
-
-               err := pc.grabCnx()
-               if err == nil {
-                       // Successfully reconnected
-                       pc.log.Info("Reconnected consumer to broker")
-                       return
-               }
-       }
-}
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index a68e34a..b1d4faa 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -34,7 +34,6 @@ import (
 
        "github.com/apache/pulsar-client-go/pkg/auth"
        "github.com/apache/pulsar-client-go/pkg/pb"
-       "github.com/apache/pulsar-client-go/util"
 )
 
 type TLSOptions struct {
@@ -286,7 +285,10 @@ func (c *connection) run() {
                        if req == nil {
                                return
                        }
-                       c.pendingReqs[req.id] = req
+                       // does this request expect a response?
+                       if req.id != RequestIDNoResponse {
+                               c.pendingReqs[req.id] = req
+                       }
                        c.writeCommand(req.cmd)
 
                case cmd := <- c.incomingCmdCh:
@@ -509,9 +511,7 @@ func (c *connection) handleCloseConsumer(closeConsumer 
*pb.CommandCloseConsumer)
        c.log.Infof("Broker notification of Closed consumer: %d", 
closeConsumer.GetConsumerId())
        consumerID := closeConsumer.GetConsumerId()
        if consumer, ok := c.consumerHandler(consumerID); ok {
-               if !util.IsNil(consumer) {
-                       consumer.ConnectionClosed()
-               }
+               consumer.ConnectionClosed()
        } else {
                c.log.WithField("consumerID", consumerID).Warnf("Consumer with 
ID not found while closing consumer")
        }
@@ -564,9 +564,14 @@ func (c *connection) Close() {
                listener.ConnectionClosed()
        }
 
+       consumerHandlers := make(map[uint64]ConsumerHandler)
        c.consumerHandlersLock.RLock()
-       defer c.consumerHandlersLock.RUnlock()
-       for _, handler := range c.consumerHandlers {
+       for id, handler := range c.consumerHandlers {
+               consumerHandlers[id] = handler
+       }
+       c.consumerHandlersLock.RUnlock()
+
+       for _, handler := range consumerHandlers {
                handler.ConnectionClosed()
        }
 }
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 35d8b61..6a01fe0 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -31,6 +31,9 @@ type RPCResult struct {
        Cnx      Connection
 }
 
+// RequestID for a request when there is no expected response
+const RequestIDNoResponse = uint64(0)
+
 type RPCClient interface {
        // Create a new unique request id
        NewRequestID() uint64
diff --git a/pulsar/test_helper.go b/pulsar/test_helper.go
index 9930a99..1762fd7 100644
--- a/pulsar/test_helper.go
+++ b/pulsar/test_helper.go
@@ -23,6 +23,8 @@ import (
        "fmt"
        "log"
        "net/http"
+       "strings"
+       "testing"
        "time"
 )
 
@@ -63,3 +65,21 @@ func httpPut(url string, body interface{}) {
        }
        resp.Body.Close()
 }
+
+func makeHTTPCall(t *testing.T, method string, urls string, body string) {
+       client := http.Client{}
+
+       req, err := http.NewRequest(method, urls, strings.NewReader(body))
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       req.Header.Set("Content-Type", "application/json")
+       req.Header.Set("Accept", "application/json")
+
+       res, err := client.Do(req)
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer res.Body.Close()
+}
diff --git a/pulsar/unacked_msg_tracker.go b/pulsar/unacked_msg_tracker.go
index ffc6eff..5dfe895 100644
--- a/pulsar/unacked_msg_tracker.go
+++ b/pulsar/unacked_msg_tracker.go
@@ -174,7 +174,7 @@ func (t *UnackedMessageTracker) handlerCmd() {
                                                                MessageIds: 
messageIdsMap[int32(index)],
                                                        }
 
-                                                       _, err := 
subConsumer.client.rpcClient.RequestOnCnx(subConsumer.cnx, requestID,
+                                                       _, err := 
subConsumer.client.rpcClient.RequestOnCnx(subConsumer.conn, requestID,
                                                                
pb.BaseCommand_REDELIVER_UNACKNOWLEDGED_MESSAGES, cmd)
                                                        if err != nil {
                                                                
subConsumer.log.WithError(err).Error("Failed to unsubscribe consumer")
diff --git a/util/util.go b/util/util.go
deleted file mode 100644
index bd4f5d6..0000000
--- a/util/util.go
+++ /dev/null
@@ -1,44 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
-       "reflect"
-)
-
-// IsNil check if the interface is nil
-func IsNil(i interface{}) bool {
-       vi := reflect.ValueOf(i)
-       if vi.Kind() == reflect.Ptr {
-               return vi.IsNil()
-       }
-       return false
-}
-
-// RemoveDuplicateElement remove repeating elements from the string slice
-func RemoveDuplicateElement(addrs []string) []string {
-       result := make([]string, 0, len(addrs))
-       temp := map[string]struct{}{}
-       for _, item := range addrs {
-               if _, ok := temp[item]; !ok {
-                       temp[item] = struct{}{}
-                       result = append(result, item)
-               }
-       }
-       return result
-}
diff --git a/util/util_test.go b/util/util_test.go
deleted file mode 100644
index 3b0a9f9..0000000
--- a/util/util_test.go
+++ /dev/null
@@ -1,41 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package util
-
-import (
-       "fmt"
-       "strings"
-       "testing"
-
-       "github.com/stretchr/testify/assert"
-)
-
-func TestIsNil(t *testing.T) {
-       var a interface{}
-       var b interface{} = (*int)(nil)
-
-       assert.True(t, a == nil)
-       assert.False(t, b == nil)
-}
-
-func TestRemoveDuplicateElement(t *testing.T) {
-       s := []string{"hello", "world", "hello", "golang", "hello", "ruby", 
"php", "java"}
-       resList := RemoveDuplicateElement(s)
-       res := fmt.Sprintf("%s", resList)
-       assert.Equal(t, 1, strings.Count(res, "hello"))
-}

Reply via email to