cckellogg commented on a change in pull request #158: Topic reader 
implementation
URL: https://github.com/apache/pulsar-client-go/pull/158#discussion_r362709821
 
 

 ##########
 File path: pulsar/consumer_partition.go
 ##########
 @@ -516,15 +577,27 @@ func (pc *partitionConsumer) grabConn() error {
        initialPosition := 
toProtoInitialPosition(pc.options.subscriptionInitPos)
        requestID := pc.client.rpcClient.NewRequestID()
        cmdSubscribe := &pb.CommandSubscribe{
-               RequestId:       proto.Uint64(requestID),
-               Topic:           proto.String(pc.topic),
-               SubType:         subType.Enum(),
-               Subscription:    proto.String(pc.options.subscription),
-               ConsumerId:      proto.Uint64(pc.consumerID),
-               ConsumerName:    proto.String(pc.name),
-               InitialPosition: initialPosition.Enum(),
-               Schema:          nil,
+               Topic:                      proto.String(pc.topic),
+               Subscription:               
proto.String(pc.options.subscription),
+               SubType:                    subType.Enum(),
+               ConsumerId:                 proto.Uint64(pc.consumerID),
+               RequestId:                  proto.Uint64(requestID),
+               ConsumerName:               proto.String(pc.name),
+               PriorityLevel:              nil,
+               Durable:                    
proto.Bool(pc.options.subscriptionMode == durable),
+               Metadata:                   
internal.ConvertFromStringMap(pc.options.metadata),
+               ReadCompacted:              
proto.Bool(pc.options.readCompacted),
+               Schema:                     nil,
+               InitialPosition:            initialPosition.Enum(),
+               ReplicateSubscriptionState: 
proto.Bool(pc.options.replicateSubscriptionState),
        }
+
+       pc.startMessageID = pc.clearReceiverQueue()
 
 Review comment:
   Should we ensure that the receiver queue is only ever cleared from a single 
go routine? This was the reason it was done in the dispatcher. There could be a 
small random race condition here because the dispatcher go routine and the 
eventsLoop go routine will both be reading from the queueCh so when the nil 
message is sent there is a possibility the dispatcher will read it and then 
grabConn will never return. 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to