dsmlily commented on issue #147: consumer.Receive () is blocking, availablePermits are decreasing, but msgBacklog is increasing URL: https://github.com/apache/pulsar-client-go/issues/147#issuecomment-569410538 > Can you provide some code and or steps to reproduce this? This will help in debugging it. ``` type ProxyConsumer struct { consumer pulsar.Consumer consumerMap map[string]chan<- string mutex sync.RWMutex } func (pc *ProxyConsumer) Register(user string, ch chan<- string) error { if pc.consumer == nil { err := createGlobalConsumer() if err != nil { return err } } pc.mutex.Lock() //if _, ok := pc.consumerMap[user]; ok { // pc.mutex.Unlock() // return errors.New("already exist") //}s pc.consumerMap[user] = ch pc.mutex.Unlock() return nil } func (pc *ProxyConsumer) Unregister(user string, ch chan<- string) { pc.mutex.Lock() if obj, ok := pc.consumerMap[user]; ok { if obj == ch { delete(pc.consumerMap, user) } } pc.mutex.Unlock() } func createGlobalConsumer() error { if GProxyConsumer.consumer != nil { return nil } responceTopic := ConfMgr.PRODUCT_ID + general.CONST_SLASH + ConfMgr.PULSAR_NAMESPACE+ general.CONST_SLASH + GSourceFlag subscriptionName := ConfMgr.PULSAR_GLOBAL_SUBSCRIBE_NAME + general.CONST_COLON + GSourceFlag var t = pulsar.KeyShared switch ConfMgr.PULSAR_RESPONSE_SUBTYPE { case subscribe_exclusive: t = pulsar.Exclusive case subscribe_failover: t = pulsar.Failover case subscribe_keyShared: t = pulsar.KeyShared case subscribe_shared: t = pulsar.Shared } cfg := pulsar.ConsumerOptions{ Topic: responceTopic, SubscriptionName: subscriptionName, Type: t, ReceiverQueueSize: ConfMgr.PULSAR_CONSUME_QUESIZE, SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest, } log.Info("createGlobalConsumer topic: %s subscriptionName:%s",responceTopic, subscriptionName) headConsumer, err := GConsumerClient.Subscribe(cfg) if err != nil { log.Error("Could not establish subscription %v", err) return err } fmt.Println(headConsumer) GProxyConsumer.consumerMap = make(map[string]chan<- string) GProxyConsumer.consumer = headConsumer go func() { for { ctx := context.Background() msg, err := headConsumer.Receive(ctx) if err != nil { log.Error("receive") continue } log.Info("headConsumer.Receive: %s", string(msg.Payload()),general.LOG_TYPE,"responce") //解析出userid var mapMsg map[string]interface{} json.Unmarshal(msg.Payload(), &mapMsg) if err != nil { log.Error("error") headConsumer.Ack(msg) continue } headConsumer.Ack(msg) //log.Info("headConsumer.Receive:map %v", mapMsg) roomId, ok := mapMsg["roomid"] if !ok { log.Error("headConsumer.Receive not roomid",general.LOG_TYPE,"responce") continue } userId, ok := mapMsg["userid"] if !ok { log.Error("headConsumer.Receive not userid",general.LOG_TYPE,"responce") continue } userkey := general.GenIdenKey(roomId.(string), userId.(string)) GProxyConsumer.mutex.RLock() if ch, ok := GProxyConsumer.consumerMap[userkey]; ok && ch != nil { log.Info("inchan to %s", userkey) select { case ch <- string(msg.Payload()): default: } } GProxyConsumer.mutex.RUnlock() //headConsumer.Ack(msg) } }() return nil } func initGlobalPulsarConsumer() { //ctx := context.Background() client, err := pulsar.NewClient(pulsar.ClientOptions{ URL: ConfMgr.PULSAR_ADDR, ConnectionTimeout: time.Second * time.Duration(ConfMgr.PULSAR_CONNECT_TIMEOUT), OperationTimeout: time.Second * time.Duration(ConfMgr.PULSAR_OPERATION_TIMEOUT), }) if err != nil || client == nil { log.Error("failed to connect pulsar") return } GConsumerClient = client createGlobalConsumer() } ```
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
