h-j-13 opened a new issue #488:
URL: https://github.com/apache/pulsar-client-go/issues/488


   #### Expected behavior / Actual behavior
   I use pulsar-client-go write a demo program to consume message from pulsar, 
found that the **message consumption rate is fluctuating** and consume 
performance is very low.  
   But I think the consumption rate should be stable.  
   
   When use python pulsar api to make a demo to consume the same topic, the 
message consumption rate is stable, and performance is better than the go demo, 
confused...
   
   In addition, I found that the message consumption rate of go demo will keep 
falling at first , after a few minutes, the rate will increase. and then keep 
looping this process again. When the consumption rate is low, the the CPU 
occupancy rate of the  go demo process is also low, only around 20%. 
   At the same time, the python demo's  CPU occupancy rate is always around 
120%.
   
   **Maybe I used the pulsar-client-go api incorrectly**? I also try use more 
go goroutine to receive or create more shared consumer consume together. But it 
doesn't seem to improve a lot.
   
   ---
   
   #### Steps to reproduce
   I have produced some messages of the same size (the size is about 2k) in 
advance, the data is randomly generated, and will not produce during the 
consumption of the demo program. Then start the demo programe
   
   here is my code
   
   Go demo
   ```go
   package main
   
   import (
        "context"
        "github.com/apache/pulsar-client-go/pulsar"
        "log"
        "strconv"
        "sync/atomic"
        "time"
   )
   
   var count int32
   
   func ShowConsumeNum() {
        ticker := time.NewTicker(time.Duration(60) * time.Second)
        defer ticker.Stop()
   
        for range ticker.C {
                log.Println("consume " + 
strconv.Itoa(int(atomic.LoadInt32(&count))) + " msg")
                atomic.StoreInt32(&count, 0)
        }
   }
   
   func main() {
        go ShowConsumeNum()
   
        client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "my url"})
        if err != nil {
                log.Fatal(err)
        }
   
        defer client.Close()
   
        consumer, err := client.Subscribe(pulsar.ConsumerOptions{
                Topic:                       "my_topic",
                SubscriptionName:            "test_consumer_" + 
strconv.FormatInt(time.Now().Unix(), 10),
                Type:                        pulsar.Shared,
                SubscriptionInitialPosition: 
pulsar.SubscriptionPositionEarliest,
        })
        if err != nil {
                log.Fatal(err)
        }
        defer consumer.Close()
   
        atomic.StoreInt32(&count, 0)
   
        for {
                msg, err := consumer.Receive(context.Background())
                if err != nil {
                        log.Fatal(err)
                }
                //fmt.Printf("Received message msgId: %#v -- content: '%s'\n", 
                                         msg.ID(), string(msg.Payload()))
                consumer.Ack(msg)
                atomic.AddInt32(&count, 1)
   
        }
   
        if err := consumer.Unsubscribe(); err != nil {
                log.Fatal(err)
        }
   }
   
   ```
   
   Go demo result
   ```shell
   go build test_consumer.go
   ./test_consumer
   
   ...
   
   2021/03/15 16:48:45 consume 1250000 msg
   2021/03/15 16:49:45 consume 646270 msg
   2021/03/15 16:50:45 consume 497119 msg
   2021/03/15 16:51:45 consume 422100 msg
   2021/03/15 16:52:45 consume 372000 msg
   2021/03/15 16:53:45 consume 338500 msg
   2021/03/15 16:54:45 consume 312500 msg
   2021/03/15 16:55:45 consume 288000 msg
   2021/03/15 16:56:45 consume 273500 msg
   2021/03/15 16:57:45 consume 258000 msg
   2021/03/15 16:58:45 consume 246000 msg
   2021/03/15 16:59:45 consume 634000 msg
   2021/03/15 17:00:45 consume 928500 msg
   2021/03/15 17:01:45 consume 597842 msg
   2021/03/15 17:02:45 consume 478458 msg
   2021/03/15 17:03:45 consume 409700 msg
   2021/03/15 17:04:45 consume 363500 msg
   2021/03/15 17:05:45 consume 331500 msg
   2021/03/15 17:06:45 consume 305300 msg
   2021/03/15 17:07:45 consume 285800 msg
   2021/03/15 17:08:45 consume 265900 msg
   2021/03/15 17:09:45 consume 252500 msg
   2021/03/15 17:10:45 consume 615500 msg
   2021/03/15 17:11:45 consume 923500 msg
   2021/03/15 17:12:45 consume 602100 msg
   2021/03/15 17:13:45 consume 479400 msg
   2021/03/15 17:14:45 consume 414920 msg
   2021/03/15 17:15:45 consume 366576 msg
   2021/03/15 17:16:45 consume 327500 msg
   2021/03/15 17:17:45 consume 304426 msg
   2021/03/15 17:18:45 consume 285570 msg
   2021/03/15 17:19:45 consume 272000 msg
   2021/03/15 17:20:45 consume 251900 msg
   2021/03/15 17:21:45 consume 244100 msg
   
   ...
   ```
   
   ---
   
   Python demo
   ```python
   #!/usr/bin/env python3
   # -*- coding: utf-8 -*-
   
   import time
   import pulsar
   import threading
   
   PULSAR_URL = "my url"
   TOPIC = "my_topic"
   RECEIVE_MSG_TIMEOUT_MS = 3000
   CONSUMER_TYPE = pulsar.ConsumerType.Shared
   
   consume_cnt = 0
   
   
   def consume():
       client = pulsar.Client(PULSAR_URL)
       consumer = client.subscribe(TOPIC,
                                   subscription_name="test_consume_py" + 
str(time.time()),
                                   consumer_type=pulsar.ConsumerType.Shared,
                                   
initial_position=pulsar.InitialPosition.Earliest)
       global consume_cnt
   
       while True:
           try:
               msg = consumer.receive(RECEIVE_MSG_TIMEOUT_MS)
               consumer.acknowledge(msg)
               consume_cnt += 1
           except KeyboardInterrupt:
               exit(-1)
       # never close
   
   
   class myThread(threading.Thread):
       def __init__(self,):
           threading.Thread.__init__(self)
   
       def run(self):
           show_num()
   
   
   def show_num():
       # count consume msg num every 1 min
       while 1:
           global consume_cnt
           time.sleep(60)
           print(time.strftime('%Y-%m-%d 
%H:%M:%S',time.localtime(time.time()))+ " consume " + str(consume_cnt) + " 
msg.")
           consume_cnt = 0
   
   
   if __name__ == '__main__':
       thread1 = myThread()
       thread1.start()
       consume()
       thread1.join()
   ```
   
   
   Python demo result
   ```shell
   ...
   2021-03-15 17:15:22 consume 3719645 msg.
   2021-03-15 17:16:22 consume 3731980 msg.
   2021-03-15 17:17:22 consume 3762029 msg.
   2021-03-15 17:18:22 consume 3799080 msg.
   2021-03-15 17:19:22 consume 3990855 msg.
   2021-03-15 17:20:22 consume 3813902 msg.
   2021-03-15 17:21:22 consume 3718900 msg.
   2021-03-15 17:22:22 consume 3646500 msg.
   ```
   
   #### System configuration
   OS: Centos
   go version: 1.15
   **pulsar go client version**: v0.4.0
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to