ilyam8 opened a new issue #6510: negative `unackedMessages` in consumers stats URL: https://github.com/apache/pulsar/issues/6510 **Describe the bug** Pulsar docker image `apachepulsar/pulsar:2.5.0`. Standalone mode. I noticed that `pulsar_subscription_unacked_messages` value in the prometheus stats is a negative number for `non-persitent` topics. **According my tests (not extensive) the bug appears only when topic type is `non-persistent`**. Related: #5929 ___ That is what i get: ```cmd pulsar_subscription_unacked_messages{cluster="standalone",namespace="sample/ns1",topic="non-persistent://sample/ns1/demo-1",subscription="my-consumer-11"} -4739 1583659567277 ``` ```json { "non-persistent://sample/ns1/demo-1": { "publishers": [ { "msgRateIn": 8.998, "msgThroughputIn": 449.904, "averageMsgSize": 50, "address": "/172.17.0.1:34560", "producerId": 0, "producerName": "my-producer-11", "connectedSince": "2020-03-08T09:17:45.526Z", "clientVersion": "pulsar-client-go", "metadata": {} } ], "replication": {}, "subscriptions": { "my-consumer-11": { "consumers": [ { "address": "/172.17.0.1:34560", "consumerName": "", "availablePermits": 1, "connectedSince": "2020-03-08T09:17:45.526Z", "msgRateOut": 8.998, "msgThroughputOut": 449.901, "msgRateRedeliver": 0, "unackedMessages": -5925, "blockedConsumerOnUnackedMsgs": false, "clientVersion": "pulsar-client-go", "metadata": {} } ], "msgBacklog": 0, "msgRateExpired": 0, "msgRateOut": 8.998, "msgThroughputOut": 449.901, "msgRateRedeliver": 0, "type": "Shared", "msgDropRate": 0 } }, "producerCount": 1, "averageMsgSize": 49.999, "msgRateIn": 8.998, "msgRateOut": 8.998, "msgThroughputIn": 449.904, "msgThroughputOut": 449.901 } } ``` **To Reproduce** Steps to reproduce the behavior: See the code below. It creates publisher/consumer for following topics: - `non-persistent://sample/ns1/demo-1` # has hegative `unackedMessages` in consumers stats - `persistent://sample/ns1/demo-1` # no problem And that is pretty much it. ```go package main import ( "context" "fmt" "log" "os" "os/signal" "sync" "syscall" "time" comcast "github.com/Comcast/pulsar-client-go" ) var pulsarPool = comcast.NewManagedClientPool() func newPulsarProducer(name, topic string) *comcast.ManagedProducer { return comcast.NewManagedProducer(pulsarPool, comcast.ManagedProducerConfig{ ManagedClientConfig: comcast.ManagedClientConfig{ ClientConfig: comcast.ClientConfig{ Addr: "pulsar://localhost:6650", }, }, Topic: topic, Name: name, }) } func newPulsarConsumer(name, topic string) *comcast.ManagedConsumer { return comcast.NewManagedConsumer(pulsarPool, comcast.ManagedConsumerConfig{ ManagedClientConfig: comcast.ManagedClientConfig{ ClientConfig: comcast.ClientConfig{ Addr: "pulsar://localhost:6650", }, }, Topic: topic, Name: name, }) } func startPulsarProducer(p *comcast.ManagedProducer, wg *sync.WaitGroup, stop chan struct{}) { defer func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _ = p.Close(ctx) wg.Done() }() produce := func(msg string) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _, err := p.Send(ctx, []byte(msg)) return err } loop: for i := 0; ; i++ { select { case <-stop: break loop default: msg := fmt.Sprintf("message-%d", i) if err := produce(msg); err != nil { log.Printf("producer error: %v\n", err) break loop } } time.Sleep(time.Millisecond * 100) } } func startPulsarConsumer(c *comcast.ManagedConsumer, wg *sync.WaitGroup) { defer func() { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() _ = c.Unsubscribe(ctx) _ = c.Close(ctx) wg.Done() }() consume := func() error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() msg, err := c.Receive(ctx) if err != nil { return err } log.Println("received:", msg.Topic, string(msg.Payload), *msg.Msg.ConsumerId) return c.Ack(ctx, msg) } for { if err := consume(); err != nil { log.Printf("consumer error: %v\n", err) break } } } func doPulsar() { stopCh := make(chan struct{}) go func() { signalChan := make(chan os.Signal, 1) signal.Notify(signalChan, syscall.SIGINT) <-signalChan close(stopCh) log.Println("SIGINT received. Terminating...") }() topic1 := "non-persistent://sample/ns1/demo-1" p11 := newPulsarProducer("my-producer-11", topic1) c11 := newPulsarConsumer("my-consumer-11", topic1) topic2 := "persistent://sample/ns1/demo-1" p21 := newPulsarProducer("my-producer-21", topic2) c21 := newPulsarConsumer("my-consumer-21", topic2) var wg sync.WaitGroup wg.Add(4) go startPulsarProducer(p11, &wg, stopCh) go startPulsarConsumer(c11, &wg) go startPulsarProducer(p21, &wg, stopCh) go startPulsarConsumer(c21, &wg) wg.Wait() } func main() { doPulsar() } ``` **Expected behavior** Do not have negative `unackedMessages`/`pulsar_subscription_unacked_messages` values. **Desktop (please complete the following information):** - ProductName: Mac OS X, ProductVersion: 10.15.3
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
