merlimat commented on a change in pull request #86: Simplify and refactor parts 
of  the single topic consumer.
URL: https://github.com/apache/pulsar-client-go/pull/86#discussion_r344858784
 
 

 ##########
 File path: pulsar/consumer_partition.go
 ##########
 @@ -0,0 +1,508 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package pulsar
+
+import (
+       "fmt"
+       "math"
+       "time"
+
+       "github.com/golang/protobuf/proto"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/apache/pulsar-client-go/pkg/pb"
+       "github.com/apache/pulsar-client-go/pulsar/internal"
+)
+
+type consumerState int
+
+const (
+       consumerInit consumerState = iota
+       consumerReady
+       consumerClosing
+       consumerClosed
+)
+
+type partitionConsumerOpts struct {
+       topic               string
+       subscription        string
+       subscriptionType    SubscriptionType
+       subscriptionInitPos SubscriptionInitialPosition
+       partitionIdx        int
+       receiverQueueSize   int
+}
+
+type partitionConsumer struct {
+       client *client
+
+       // this is needed for sending ConsumerMessage on the messageCh
+       parentConsumer Consumer
+       state          consumerState
+       options        *partitionConsumerOpts
+
+       conn internal.Connection
+
+       topic        string
+       name         string
+       consumerID   uint64
+       partitionIdx int
+
+       // shared channel
+       messageCh chan ConsumerMessage
+
+       // the number of message slots available
+       availablePermits int32
+
+       // the size of the queue channel for buffering messages
+       queueSize int32
+       queueCh   chan []*message
+
+       eventsCh    chan interface{}
+       connectedCh chan struct{}
+       closeCh     chan struct{}
+
+       log *log.Entry
+}
+
+func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
+       messageCh chan ConsumerMessage) (*partitionConsumer, error) {
+       pc := &partitionConsumer{
+               state:          consumerInit,
+               parentConsumer: parent,
+               client:         client,
+               options:        options,
+               topic:          options.topic,
+               consumerID:     client.rpcClient.NewConsumerID(),
+               partitionIdx:   options.partitionIdx,
+               eventsCh:       make(chan interface{}, 3),
+               queueSize:      int32(options.receiverQueueSize),
+               queueCh:        make(chan []*message, 
options.receiverQueueSize),
+               connectedCh:    make(chan struct{}),
+               messageCh:      messageCh,
+               closeCh:        make(chan struct{}),
+               log:            log.WithField("topic", options.topic),
+       }
+
+       err := pc.grabConn()
+       if err != nil {
+               log.WithError(err).Errorf("Failed to create consumer")
+               return nil, err
+       }
+       pc.log = pc.log.WithField("name", pc.name)
+       pc.log.Info("Created consumer")
+       pc.state = consumerReady
+
+       go pc.dispatcher()
+
+       go pc.runEventsLoop()
+
+       return pc, nil
+}
+
+func (pc *partitionConsumer) Unsubscribe() error {
+       req := &unsubscribeRequest{doneCh: make(chan struct{})}
+       pc.eventsCh <- req
+
+       // wait for the request to complete
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalUnsubscribe(unsub *unsubscribeRequest) {
+       defer close(unsub.doneCh)
+
+       requestID := pc.client.rpcClient.NewRequestID()
+       cmdUnsubscribe := &pb.CommandUnsubscribe{
+               RequestId:  proto.Uint64(requestID),
+               ConsumerId: proto.Uint64(pc.consumerID),
+       }
+       _, err := pc.client.rpcClient.RequestOnCnx(pc.conn, requestID, 
pb.BaseCommand_UNSUBSCRIBE, cmdUnsubscribe)
+       if err != nil {
+               pc.log.WithError(err).Error("Failed to unsubscribe consumer")
+               unsub.err = err
+       }
+
+       pc.conn.DeleteConsumeHandler(pc.consumerID)
+}
+
+func (pc *partitionConsumer) Ack(msg Message) error {
+       return pc.AckID(msg.ID())
+}
+
+func (pc *partitionConsumer) AckID(msgID MessageID) error {
+       req := &ackRequest{
+               doneCh: make(chan struct{}),
+               msgID:  msgID,
+       }
+       pc.eventsCh <- req
+
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) Close() error {
+       if pc.state != consumerReady {
+               return nil
+       }
+
+       req := &closeRequest{doneCh: make(chan struct{})}
+       pc.eventsCh <- req
+
+       // wait for request to finish
+       <-req.doneCh
+       return req.err
+}
+
+func (pc *partitionConsumer) internalAck(req *ackRequest) {
+       defer close(req.doneCh)
+
+       id := &pb.MessageIdData{}
+       messageIDs := make([]*pb.MessageIdData, 0)
+       err := proto.Unmarshal(req.msgID.Serialize(), id)
+       if err != nil {
+               pc.log.WithError(err).Error("unable to serialize message id")
+               req.err = err
+       }
+
+       messageIDs = append(messageIDs, id)
+       requestID := internal.RequestIDNoResponse
+       cmdAck := &pb.CommandAck{
+               ConsumerId: proto.Uint64(pc.consumerID),
+               MessageId:  messageIDs,
+               AckType:    pb.CommandAck_Individual.Enum(),
+       }
+       _, err = pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_ACK, cmdAck)
+       if err != nil {
+               pc.log.WithError(err).Errorf("failed to ack message_id=%s", id)
+               req.err = err
+       }
+}
+
+func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, 
headersAndPayload internal.Buffer) error {
+       pbMsgID := response.GetMessageId()
+
+       reader := internal.NewMessageReader(headersAndPayload.ReadableSlice())
+       msgMeta, err := reader.ReadMessageMetadata()
+       if err != nil {
+               // TODO send discardCorruptedMessage
+               return err
+       }
+
+       numMsgs := 1
+       if msgMeta.NumMessagesInBatch != nil {
+               numMsgs = int(msgMeta.GetNumMessagesInBatch())
+       }
+       messages := make([]*message, numMsgs)
+       for i := 0; i < numMsgs; i++ {
+               smm, payload, err := reader.ReadMessage()
+               if err != nil {
+                       // TODO send corrupted message
+                       return err
+               }
+
+               msgID := newMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), i, pc.partitionIdx)
+               var msg *message
+               if smm != nil {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(smm.GetEventTime()),
+                               key:         smm.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(smm.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
+                       }
+               } else {
+                       msg = &message{
+                               publishTime: 
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                               eventTime:   
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                               key:         msgMeta.GetPartitionKey(),
+                               properties:  
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                               topic:       pc.topic,
+                               msgID:       msgID,
+                               payLoad:     payload,
+                       }
+               }
+
+               messages[i] = msg
+       }
+
+       // send messages to the dispatcher
+       pc.queueCh <- messages
+       return nil
+}
+
+func (pc *partitionConsumer) ConnectionClosed() {
+       // Trigger reconnection in the consumer goroutine
+       pc.eventsCh <- &connectionClosed{}
+}
+
+// Flow command gives additional permits to send messages to the consumer.
+// A typical consumer implementation will use a queue to accumulate these 
messages
+// before the application is ready to consume them. After the consumer is 
ready,
+// the client needs to give permission to the broker to push messages.
+func (pc *partitionConsumer) internalFlow(permits uint32) error {
+       if permits == 0 {
+               return fmt.Errorf("invalid number of permits requested: %d", 
permits)
+       }
+
+       requestID := internal.RequestIDNoResponse
+       cmdFlow := &pb.CommandFlow{
+               ConsumerId:     proto.Uint64(pc.consumerID),
+               MessagePermits: proto.Uint32(permits),
+       }
+       _, err := pc.client.rpcClient.RequestOnCnxNoWait(pc.conn, requestID, 
pb.BaseCommand_FLOW, cmdFlow)
+       if err != nil {
+               return err
+       }
+
+       return nil
+}
+
+// dispatcher manages the internal message queue channel
+// and manages the flow control
+func (pc *partitionConsumer) dispatcher() {
+       defer func() {
+               pc.log.Infof("consumer=%d exiting dispatch loop", pc.consumerID)
+       }()
+       var messages []*message
+       for {
+               var queueCh chan []*message
+               var messageCh chan ConsumerMessage
+               var nextMessage ConsumerMessage
+
+               // are there more messages to send?
+               if len(messages) > 0 {
+                       nextMessage = ConsumerMessage{
+                               Consumer: pc.parentConsumer,
+                               Message:  messages[0],
+                       }
+                       messageCh = pc.messageCh
+               } else {
+                       // we are ready for more messages
+                       queueCh = pc.queueCh
+               }
+
+               select {
+               case <-pc.closeCh:
+                       return
+
+               case _, ok := <-pc.connectedCh:
+                       if !ok {
+                               return
+                       }
+                       pc.log.Debug("dispatcher received connection event")
+
+                       // drain messages
+                       messages = nil
+
+                       // drain the message queue on any new connection by 
sending a
+                       // special nil message to the channel so we know when 
to stop dropping messages
+                       go func() {
+                               pc.queueCh <- nil
+                       }()
+                       for m := range pc.queueCh {
+                               // the queue has been drained
+                               if m == nil {
+                                       break
+                               }
+                       }
+
+                       // reset available permits
+                       pc.availablePermits = 0
+                       initialPermits := uint32(pc.queueSize)
+
+                       pc.log.Debugf("dispatcher requesting initial 
permits=%d", initialPermits)
+                       // send initial permits
+                       if err := pc.internalFlow(initialPermits); err != nil {
+                               pc.log.WithError(err).Error("unable to send 
initial permits to broker")
+                       }
+
+               case msgs, ok := <-queueCh:
+                       if !ok {
+                               return
+                       }
+                       // we only read messages here after the consumer has 
processed all messages
+                       // in the previous batch
+                       messages = msgs
+
+               // if the messageCh is nil or the messageCh is full this will 
not be selected
+               case messageCh <- nextMessage:
+                       // allow this message to be garbage collected
+                       messages[0] = nil
+                       messages = messages[1:]
+
+                       // TODO implement a better flow controller
+                       // send more permits if needed
+                       pc.availablePermits++
+                       flowThreshold := 
int32(math.Max(float64(pc.queueSize/2), 1))
+                       if pc.availablePermits >= flowThreshold {
+                               availablePermits := pc.availablePermits
+                               requestedPermits := availablePermits
+                               pc.availablePermits = 0
+
+                               pc.log.Debugf("requesting more permits=%d 
available=%d", requestedPermits, availablePermits)
+                               if err := 
pc.internalFlow(uint32(requestedPermits)); err != nil {
+                                       pc.log.WithError(err).Error("unable to 
send permits")
+                               }
+                       }
+               }
+       }
+}
+
+type ackRequest struct {
+       doneCh chan struct{}
+       msgID  MessageID
+       err    error
+}
+
+type unsubscribeRequest struct {
+       doneCh chan struct{}
+       err    error
+}
+
+type closeRequest struct {
+       doneCh chan struct{}
+       err    error
+}
+
+func (pc *partitionConsumer) runEventsLoop() {
+       defer func() {
+               pc.log.Infof("consumer=%d exiting events loop", pc.consumerID)
+       }()
+       for {
+               select {
+               case <-pc.closeCh:
+                       return
+               case i := <-pc.eventsCh:
+                       switch v := i.(type) {
+                       case *ackRequest:
+                               pc.internalAck(v)
+                       case *unsubscribeRequest:
+                               pc.internalUnsubscribe(v)
+                       case *connectionClosed:
+                               pc.reconnectToBroker()
+                       case *closeRequest:
+                               pc.internalClose(v)
+                               return
+                       }
+               }
+       }
+}
+
+func (pc *partitionConsumer) internalClose(req *closeRequest) {
+       defer close(req.doneCh)
+       if pc.state != consumerReady {
+               return
+       }
+
+       pc.state = consumerClosing
+       pc.log.Infof("Closing consumer=%d", pc.consumerID)
 
 Review comment:
   Since we're repeating consumer Id, we should put the field directly in the 
logger, to ensure consistency. 
   
   In this case though, the consumer id does not make any sense to the 
application, rather it's an internal id and I'd prefer to actually just hide it 
(for other purposes than client lib debug).
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to