h-j-13 opened a new issue #488:
URL: https://github.com/apache/pulsar-client-go/issues/488
#### Expected behavior / Actual behavior
I use pulsar-client-go write a demo program to consume message from pulsar,
found that the **message consumption rate is fluctuating** and consume
performance is very low.
But I think the consumption rate should be stable.
When use python pulsar api to make a demo to consume the same topic, the
message consumption rate is stable, and performance is better than the go demo,
confused...
In addition, I found that the message consumption rate of go demo will keep
falling at first , after a few minutes, the rate will increase. and then keep
looping this process again. When the consumption rate is low, the the CPU
occupancy rate of the go demo process is also low, only around 20%.
At the same time, the python demo's CPU occupancy rate is always around
120%.
**Maybe I used the pulsar-client-go api incorrectly**? I also try use more
go goroutine to receive or create more shared consumer consume together. But it
doesn't seem to improve a lot.
---
#### Steps to reproduce
I have produced some messages of the same size (the size is about 2k) in
advance, the data is randomly generated, and will not produce during the
consumption of the demo program. Then start the demo programe
here is my code
Go demo
```go
package main
import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"log"
"strconv"
"sync/atomic"
"time"
)
var count int32
func ShowConsumeNum() {
ticker := time.NewTicker(time.Duration(60) * time.Second)
defer ticker.Stop()
for range ticker.C {
log.Println("consume " +
strconv.Itoa(int(atomic.LoadInt32(&count))) + " msg")
atomic.StoreInt32(&count, 0)
}
}
func main() {
go ShowConsumeNum()
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "my url"})
if err != nil {
log.Fatal(err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "my_topic",
SubscriptionName: "test_consumer_" +
strconv.FormatInt(time.Now().Unix(), 10),
Type: pulsar.Shared,
SubscriptionInitialPosition:
pulsar.SubscriptionPositionEarliest,
})
if err != nil {
log.Fatal(err)
}
defer consumer.Close()
atomic.StoreInt32(&count, 0)
for {
msg, err := consumer.Receive(context.Background())
if err != nil {
log.Fatal(err)
}
//fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
msg.ID(), string(msg.Payload()))
consumer.Ack(msg)
atomic.AddInt32(&count, 1)
}
if err := consumer.Unsubscribe(); err != nil {
log.Fatal(err)
}
}
```
Go demo result
```shell
go build test_consumer.go
./test_consumer
...
2021/03/15 16:48:45 consume 1250000 msg
2021/03/15 16:49:45 consume 646270 msg
2021/03/15 16:50:45 consume 497119 msg
2021/03/15 16:51:45 consume 422100 msg
2021/03/15 16:52:45 consume 372000 msg
2021/03/15 16:53:45 consume 338500 msg
2021/03/15 16:54:45 consume 312500 msg
2021/03/15 16:55:45 consume 288000 msg
2021/03/15 16:56:45 consume 273500 msg
2021/03/15 16:57:45 consume 258000 msg
2021/03/15 16:58:45 consume 246000 msg
2021/03/15 16:59:45 consume 634000 msg
2021/03/15 17:00:45 consume 928500 msg
2021/03/15 17:01:45 consume 597842 msg
2021/03/15 17:02:45 consume 478458 msg
2021/03/15 17:03:45 consume 409700 msg
2021/03/15 17:04:45 consume 363500 msg
2021/03/15 17:05:45 consume 331500 msg
2021/03/15 17:06:45 consume 305300 msg
2021/03/15 17:07:45 consume 285800 msg
2021/03/15 17:08:45 consume 265900 msg
2021/03/15 17:09:45 consume 252500 msg
2021/03/15 17:10:45 consume 615500 msg
2021/03/15 17:11:45 consume 923500 msg
2021/03/15 17:12:45 consume 602100 msg
2021/03/15 17:13:45 consume 479400 msg
2021/03/15 17:14:45 consume 414920 msg
2021/03/15 17:15:45 consume 366576 msg
2021/03/15 17:16:45 consume 327500 msg
2021/03/15 17:17:45 consume 304426 msg
2021/03/15 17:18:45 consume 285570 msg
2021/03/15 17:19:45 consume 272000 msg
2021/03/15 17:20:45 consume 251900 msg
2021/03/15 17:21:45 consume 244100 msg
...
```
---
Python demo
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import pulsar
import threading
PULSAR_URL = "my url"
TOPIC = "my_topic"
RECEIVE_MSG_TIMEOUT_MS = 3000
CONSUMER_TYPE = pulsar.ConsumerType.Shared
consume_cnt = 0
def consume():
client = pulsar.Client(PULSAR_URL)
consumer = client.subscribe(TOPIC,
subscription_name="test_consume_py" +
str(time.time()),
consumer_type=pulsar.ConsumerType.Shared,
initial_position=pulsar.InitialPosition.Earliest)
global consume_cnt
while True:
try:
msg = consumer.receive(RECEIVE_MSG_TIMEOUT_MS)
consumer.acknowledge(msg)
consume_cnt += 1
except KeyboardInterrupt:
exit(-1)
# never close
class myThread(threading.Thread):
def __init__(self,):
threading.Thread.__init__(self)
def run(self):
show_num()
def show_num():
# count consume msg num every 1 min
while 1:
global consume_cnt
time.sleep(60)
print(time.strftime('%Y-%m-%d
%H:%M:%S',time.localtime(time.time()))+ " consume " + str(consume_cnt) + "
msg.")
consume_cnt = 0
if __name__ == '__main__':
thread1 = myThread()
thread1.start()
consume()
thread1.join()
```
Python demo result
```shell
...
2021-03-15 17:15:22 consume 3719645 msg.
2021-03-15 17:16:22 consume 3731980 msg.
2021-03-15 17:17:22 consume 3762029 msg.
2021-03-15 17:18:22 consume 3799080 msg.
2021-03-15 17:19:22 consume 3990855 msg.
2021-03-15 17:20:22 consume 3813902 msg.
2021-03-15 17:21:22 consume 3718900 msg.
2021-03-15 17:22:22 consume 3646500 msg.
```
#### System configuration
OS: Centos
go version: 1.15
**pulsar go client version**: v0.4.0
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]