ilyam8 opened a new issue #6510: negative `unackedMessages` in consumers stats
URL: https://github.com/apache/pulsar/issues/6510
 
 
   **Describe the bug**
   
   Pulsar docker image `apachepulsar/pulsar:2.5.0`. Standalone mode.
   
   I noticed that `pulsar_subscription_unacked_messages` value in the 
prometheus stats is a negative number for `non-persitent` topics.
   
   **According my tests (not extensive) the bug appears only when topic type is 
`non-persistent`**.
   
   Related: #5929
   
   ___
   
   That is what i get:
   
   ```cmd
   
pulsar_subscription_unacked_messages{cluster="standalone",namespace="sample/ns1",topic="non-persistent://sample/ns1/demo-1",subscription="my-consumer-11"}
 -4739 1583659567277
   ```
   
   
   ```json
   {
       "non-persistent://sample/ns1/demo-1": {
           "publishers": [
               {
                   "msgRateIn": 8.998,
                   "msgThroughputIn": 449.904,
                   "averageMsgSize": 50,
                   "address": "/172.17.0.1:34560",
                   "producerId": 0,
                   "producerName": "my-producer-11",
                   "connectedSince": "2020-03-08T09:17:45.526Z",
                   "clientVersion": "pulsar-client-go",
                   "metadata": {}
               }
           ],
           "replication": {},
           "subscriptions": {
               "my-consumer-11": {
                   "consumers": [
                       {
                           "address": "/172.17.0.1:34560",
                           "consumerName": "",
                           "availablePermits": 1,
                           "connectedSince": "2020-03-08T09:17:45.526Z",
                           "msgRateOut": 8.998,
                           "msgThroughputOut": 449.901,
                           "msgRateRedeliver": 0,
                           "unackedMessages": -5925,
                           "blockedConsumerOnUnackedMsgs": false,
                           "clientVersion": "pulsar-client-go",
                           "metadata": {}
                       }
                   ],
                   "msgBacklog": 0,
                   "msgRateExpired": 0,
                   "msgRateOut": 8.998,
                   "msgThroughputOut": 449.901,
                   "msgRateRedeliver": 0,
                   "type": "Shared",
                   "msgDropRate": 0
               }
           },
           "producerCount": 1,
           "averageMsgSize": 49.999,
           "msgRateIn": 8.998,
           "msgRateOut": 8.998,
           "msgThroughputIn": 449.904,
           "msgThroughputOut": 449.901
       }
   }
   ```
   
   
   **To Reproduce**
   Steps to reproduce the behavior:
   
   See the code below. It creates publisher/consumer for following topics:
   
   - `non-persistent://sample/ns1/demo-1` # has hegative `unackedMessages` in 
consumers stats
   - `persistent://sample/ns1/demo-1` # no problem
   
   And that is pretty much it.
   
   ```go
   package main
   
   import (
        "context"
        "fmt"
        "log"
        "os"
        "os/signal"
        "sync"
        "syscall"
        "time"
   
        comcast "github.com/Comcast/pulsar-client-go"
   )
   
   var pulsarPool = comcast.NewManagedClientPool()
   
   func newPulsarProducer(name, topic string) *comcast.ManagedProducer {
        return comcast.NewManagedProducer(pulsarPool, 
comcast.ManagedProducerConfig{
                ManagedClientConfig: comcast.ManagedClientConfig{
                        ClientConfig: comcast.ClientConfig{
                                Addr: "pulsar://localhost:6650",
                        },
                },
                Topic: topic,
                Name:  name,
        })
   }
   
   func newPulsarConsumer(name, topic string) *comcast.ManagedConsumer {
        return comcast.NewManagedConsumer(pulsarPool, 
comcast.ManagedConsumerConfig{
                ManagedClientConfig: comcast.ManagedClientConfig{
                        ClientConfig: comcast.ClientConfig{
                                Addr: "pulsar://localhost:6650",
                        },
                },
                Topic: topic,
                Name:  name,
        })
   }
   
   func startPulsarProducer(p *comcast.ManagedProducer, wg *sync.WaitGroup, 
stop chan struct{}) {
        defer func() {
                ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
                defer cancel()
                _ = p.Close(ctx)
                wg.Done()
        }()
        produce := func(msg string) error {
                ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
                defer cancel()
                _, err := p.Send(ctx, []byte(msg))
                return err
        }
   
   loop:
        for i := 0; ; i++ {
                select {
                case <-stop:
                        break loop
                default:
                        msg := fmt.Sprintf("message-%d", i)
                        if err := produce(msg); err != nil {
                                log.Printf("producer error: %v\n", err)
                                break loop
                        }
                }
                time.Sleep(time.Millisecond * 100)
        }
   }
   
   func startPulsarConsumer(c *comcast.ManagedConsumer, wg *sync.WaitGroup) {
        defer func() {
                ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
                defer cancel()
                _ = c.Unsubscribe(ctx)
                _ = c.Close(ctx)
                wg.Done()
        }()
        consume := func() error {
                ctx, cancel := context.WithTimeout(context.Background(), 
time.Second*5)
                defer cancel()
   
                msg, err := c.Receive(ctx)
                if err != nil {
                        return err
                }
                log.Println("received:", msg.Topic, string(msg.Payload), 
*msg.Msg.ConsumerId)
   
                return c.Ack(ctx, msg)
        }
   
        for {
                if err := consume(); err != nil {
                        log.Printf("consumer error: %v\n", err)
                        break
                }
        }
   }
   
   func doPulsar() {
        stopCh := make(chan struct{})
        go func() {
                signalChan := make(chan os.Signal, 1)
                signal.Notify(signalChan, syscall.SIGINT)
                <-signalChan
                close(stopCh)
                log.Println("SIGINT received. Terminating...")
        }()
   
        topic1 := "non-persistent://sample/ns1/demo-1"
        p11 := newPulsarProducer("my-producer-11", topic1)
        c11 := newPulsarConsumer("my-consumer-11", topic1)
   
        topic2 := "persistent://sample/ns1/demo-1"
        p21 := newPulsarProducer("my-producer-21", topic2)
        c21 := newPulsarConsumer("my-consumer-21", topic2)
   
        var wg sync.WaitGroup
        wg.Add(4)
        go startPulsarProducer(p11, &wg, stopCh)
        go startPulsarConsumer(c11, &wg)
   
        go startPulsarProducer(p21, &wg, stopCh)
        go startPulsarConsumer(c21, &wg)
   
        wg.Wait()
   }
   
   func main() {
        doPulsar()
   }
   
   ```
   
   
   **Expected behavior**
   
   Do not have negative 
`unackedMessages`/`pulsar_subscription_unacked_messages` values.
   
   
   **Desktop (please complete the following information):**
    - ProductName:      Mac OS X, ProductVersion:       10.15.3
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to