dsmlily commented on issue #147: consumer.Receive () is blocking, 
availablePermits are decreasing, but msgBacklog is increasing
URL: 
https://github.com/apache/pulsar-client-go/issues/147#issuecomment-569410538
 
 
   > Can you provide some code and or steps to reproduce this? This will help 
in debugging it.
   
   ```
   
   type ProxyConsumer struct {
        consumer    pulsar.Consumer
        consumerMap map[string]chan<- string
        mutex       sync.RWMutex
   }
   func (pc *ProxyConsumer) Register(user string, ch chan<- string) error {
        if pc.consumer == nil {
                err := createGlobalConsumer()
                if err != nil {
                        return err
                }
        }
   
        pc.mutex.Lock()
   
        //if _, ok := pc.consumerMap[user]; ok {
        //      pc.mutex.Unlock()
        //      return errors.New("already exist")
        //}s
   
        pc.consumerMap[user] = ch
   
        pc.mutex.Unlock()
   
        return nil
   }
   
   func (pc *ProxyConsumer) Unregister(user string, ch chan<- string) {
        pc.mutex.Lock()
        if obj, ok := pc.consumerMap[user]; ok {
                if obj == ch {
                        delete(pc.consumerMap, user)
                } 
        }
        pc.mutex.Unlock()
   }
   
   func createGlobalConsumer() error {
        if GProxyConsumer.consumer != nil {
                return nil
        }
        responceTopic := ConfMgr.PRODUCT_ID + general.CONST_SLASH + 
ConfMgr.PULSAR_NAMESPACE+ general.CONST_SLASH +  GSourceFlag
        subscriptionName := ConfMgr.PULSAR_GLOBAL_SUBSCRIBE_NAME + 
general.CONST_COLON + GSourceFlag
   
        var t = pulsar.KeyShared
        switch ConfMgr.PULSAR_RESPONSE_SUBTYPE {
        case subscribe_exclusive:
                t = pulsar.Exclusive
        case subscribe_failover:
                t = pulsar.Failover
        case subscribe_keyShared:
                t = pulsar.KeyShared
        case subscribe_shared:
                t = pulsar.Shared
        }
   
        cfg := pulsar.ConsumerOptions{
                Topic:             responceTopic,
                SubscriptionName:  subscriptionName,
                Type:              t,
                ReceiverQueueSize: ConfMgr.PULSAR_CONSUME_QUESIZE,
                SubscriptionInitialPosition: pulsar.SubscriptionPositionLatest,
        }
   
        log.Info("createGlobalConsumer topic: %s 
subscriptionName:%s",responceTopic, subscriptionName)
   
        headConsumer, err := GConsumerClient.Subscribe(cfg)
        if err != nil {
                log.Error("Could not establish subscription %v", err)
                return err
        }
        fmt.Println(headConsumer)
   
        GProxyConsumer.consumerMap = make(map[string]chan<- string)
        GProxyConsumer.consumer = headConsumer
        go func() {
                for {
                        ctx := context.Background()
                        msg, err := headConsumer.Receive(ctx)
                        if err != nil {
                                log.Error("receive")
                                continue
                        }
   
                        log.Info("headConsumer.Receive: %s", 
string(msg.Payload()),general.LOG_TYPE,"responce")
                        //解析出userid
                        var mapMsg map[string]interface{}
                        json.Unmarshal(msg.Payload(), &mapMsg)
                        if err != nil {
                                log.Error("error")
                                headConsumer.Ack(msg)
                                continue
                        }
                        headConsumer.Ack(msg)
   
                        //log.Info("headConsumer.Receive:map %v", mapMsg)
   
                        roomId, ok := mapMsg["roomid"]
                        if !ok {
                                log.Error("headConsumer.Receive not 
roomid",general.LOG_TYPE,"responce")
                                continue
                        }
                        userId, ok := mapMsg["userid"]
                        if !ok {
                                log.Error("headConsumer.Receive not 
userid",general.LOG_TYPE,"responce")
                                continue
                        }
   
                        userkey := general.GenIdenKey(roomId.(string), 
userId.(string))
                        GProxyConsumer.mutex.RLock()
                        if ch, ok := GProxyConsumer.consumerMap[userkey]; ok && 
ch != nil {
   
                                log.Info("inchan to %s", userkey)
                                select {
                                case ch <- string(msg.Payload()):
                                default:
                                }
                        }
                        GProxyConsumer.mutex.RUnlock()
                        //headConsumer.Ack(msg)
                }
        }()
   
        return nil
   }
   func initGlobalPulsarConsumer() {
        //ctx := context.Background()
        client, err := pulsar.NewClient(pulsar.ClientOptions{
                URL:              ConfMgr.PULSAR_ADDR,
                ConnectionTimeout: time.Second * 
time.Duration(ConfMgr.PULSAR_CONNECT_TIMEOUT),
                OperationTimeout: time.Second * 
time.Duration(ConfMgr.PULSAR_OPERATION_TIMEOUT),
        })
   
        if err != nil || client == nil {
                log.Error("failed to connect pulsar")
                return
        }
   
        GConsumerClient = client
   
        createGlobalConsumer()
   }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to