cckellogg commented on a change in pull request #158: Topic reader
implementation
URL: https://github.com/apache/pulsar-client-go/pull/158#discussion_r362709821
##########
File path: pulsar/consumer_partition.go
##########
@@ -516,15 +577,27 @@ func (pc *partitionConsumer) grabConn() error {
initialPosition :=
toProtoInitialPosition(pc.options.subscriptionInitPos)
requestID := pc.client.rpcClient.NewRequestID()
cmdSubscribe := &pb.CommandSubscribe{
- RequestId: proto.Uint64(requestID),
- Topic: proto.String(pc.topic),
- SubType: subType.Enum(),
- Subscription: proto.String(pc.options.subscription),
- ConsumerId: proto.Uint64(pc.consumerID),
- ConsumerName: proto.String(pc.name),
- InitialPosition: initialPosition.Enum(),
- Schema: nil,
+ Topic: proto.String(pc.topic),
+ Subscription:
proto.String(pc.options.subscription),
+ SubType: subType.Enum(),
+ ConsumerId: proto.Uint64(pc.consumerID),
+ RequestId: proto.Uint64(requestID),
+ ConsumerName: proto.String(pc.name),
+ PriorityLevel: nil,
+ Durable:
proto.Bool(pc.options.subscriptionMode == durable),
+ Metadata:
internal.ConvertFromStringMap(pc.options.metadata),
+ ReadCompacted:
proto.Bool(pc.options.readCompacted),
+ Schema: nil,
+ InitialPosition: initialPosition.Enum(),
+ ReplicateSubscriptionState:
proto.Bool(pc.options.replicateSubscriptionState),
}
+
+ pc.startMessageID = pc.clearReceiverQueue()
Review comment:
Should we ensure that the receiver queue is only ever cleared from a single
go routine? This was the reason it was done in the dispatcher. There could be a
small random race condition here because the dispatcher go routine and the
eventsLoop go routine will both be reading from the queueCh so when the nil
message is sent there is a possibility the dispatcher will read it and then
grabConn will never return.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services