smiletrl opened a new issue #672:
URL: https://github.com/apache/rocketmq-client-go/issues/672
**BUG REPORT**
1. Please describe the issue you observed:
- What did you do (The steps to reproduce)?
Shutdown the consumer in main groutine (i.e, main.go) when this main
goroutine is terminated by system. In our case, the go application is shutdown
by kubernetes pod. Code looks like
```
defer func() {
consumers := orderMessage.Consumers()
for _, consumer := range consumers {
if consumer != nil {
err = consumer.Shutdown()
// log the err
fmt.Println("consumer has shutdown")
}
}
}()
ch := make(chan os.Signal)
// k8s will send signal syscall.SIGTERM to the thread
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
// Wait until k8s has shutdown the pod.
select {
case <-ch:
// panic will invoke the defer func list.
panic("gracefully shutdown")
}
```
One order message consumer looks like
```
err = pconsumer.Subscribe(constants.RocketMQTopic, selector, func(ctx
context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error)
{
// print the subscribe start message
fmt.Println("subscribe start")
// mock the time consuming business task
time.Sleep(5 * time.Second)
// print the subscribe complete message
key := msgs[0].GetKeys()
fmt.Printf("subscribe complete: tag: %+v, key: %+v, and
resonsume times: %d, and time %s\n\n", string(tag), key,
msgs[0].ReconsumeTimes, time.Now())
return consumer.ConsumeSuccess, nil
})
```
Produce the message in a test endpoint (e.,g `GET /api/produce-message `),
code looks like
```
res, err := paidProducer.SendMessageInTransaction(ctx, msg)
```
Then do the first test:
- Run the application in one terminal;
- Run `curl http://127.0.0.1:1234/api/produce-message` in a second terminal
to produce message;
- The first terminal screen will print first message `subscribe start`
- Wait for 5 seconds, it will print second message `subscribe complete:...`
- Enter the `Control + C` in the first screen.
- It will print message `consumer has shutdown`.
Do the second Test:
- Run the application in one terminal;
- Run `curl http://127.0.0.1:1234/api/produce-message` in a second terminal
to produce message;
- The first terminal screen will print first message `subscribe start`,
- Enter the `Control + C` in the first screen.
- It will print message `consumer has shutdown`.
We don't see the message `subscribe complete:...` in the second test. It
means the consumer has started consuming the message, but doesn't finish it yet
when the application has exited.
- What did you expect to see?
I would expect the second test to print `subscribe complete:...` before
printing `consumer has shutdown`. In that case, it means consumer has finished
all current active consumings before program exits.
In our kubernetes deployment, k8s could terminate pods easily, and we
certainly don't want the consumer breaks in the middle of processing.
- What did you see instead?
In the second test, it doesn't print message `subscribe complete:...`
Program has exited before consumer is finished.
2. Please tell us about your environment:
- What is your OS?
<details><summary><code>go env</code> Output</summary><br><pre>
smiletrl@Rulins-MacBook-Pro yuanyi-api % go env
GO111MODULE="auto"
GOARCH="amd64"
GOBIN=""
GOCACHE="/Users/smiletrl/Library/Caches/go-build"
GOENV="/Users/smiletrl/Library/Application Support/go/env"
GOEXE=""
GOFLAGS=""
GOHOSTARCH="amd64"
GOHOSTOS="darwin"
GOINSECURE=""
GOMODCACHE="/Users/smiletrl/go/pkg/mod"
GONOPROXY=""
GONOSUMDB=""
GOOS="darwin"
GOPATH="/Users/smiletrl/go"
GOPRIVATE=""
GOPROXY="https://goproxy.cn,direct"
GOROOT="/usr/local/Cellar/go/1.16.2/libexec"
GOSUMDB="sum.golang.org"
GOTMPDIR=""
GOTOOLDIR="/usr/local/Cellar/go/1.16.2/libexec/pkg/tool/darwin_amd64"
GOVCS=""
GOVERSION="go1.16.2"
GCCGO="gccgo"
AR="ar"
CC="clang"
CXX="clang++"
CGO_ENABLED="1"
GOMOD="/Users/smiletrl/go/src/github.com/digitalyl/yuanyi-api/go.mod"
CGO_CFLAGS="-g -O2"
CGO_CPPFLAGS=""
CGO_CXXFLAGS="-g -O2"
CGO_FFLAGS="-g -O2"
CGO_LDFLAGS="-g -O2"
PKG_CONFIG="pkg-config"
GOGCCFLAGS="-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics
-Qunused-arguments -fmessage-length=0
-fdebug-prefix-map=/var/folders/sq/2123x_0d19s90tx1q2wpgtxr0000gn/T/go-build4266147259=/tmp/go-build
-gno-record-gcc-switches -fno-common"
</pre></details>
- What is your client version?
V4_8_0
- What is your RocketMQ version?
V4_8_0
3. Other information (e.g. detailed explanation, logs, related issues,
suggestions on how to fix, etc):
One workaround is to add time sleep after consumer is shutdown. So it could
wait until all current active consuming has finished.
A better solution is to only finish consumer shutdown after its active
consuming has finished. We could add a property to `pushConsumer` struct, like
```
type pushConsumer struct {
*defaultConsumer
queueFlowControlTimes int
queueMaxSpanFlowControlTimes int
consumeFunc utils.Set
submitToConsume func(*processQueue,
*primitive.MessageQueue)
subscribedTopic map[string]string
interceptor primitive.Interceptor
queueLock *QueueLock
done chan struct{}
closeOnce sync.Once
activeConsuming int // this is the new property to log the
active consuming.
}
```
Then in the consume message call, increase the log before it starts and
decrease the log after it finishes. Something like
```
func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq
*primitive.MessageQueue) {
msgs := pq.getMessages()
if msgs == nil {
return
}
for count := 0; count < len(msgs); count++ {
var subMsgs []*primitive.MessageExt
if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
subMsgs = msgs[count:]
count = len(msgs)
} else {
next := count + pc.option.ConsumeMessageBatchMaxSize
subMsgs = msgs[count:next]
count = next - 1
}
go primitive.WithRecover(func() {
pc.activeConsuming++
fmt.Printf("pc active comsuming number: %d\n",
pc.activeConsuming)
defer func() {
pc.activeConsuming--
fmt.Printf("pc active comsuming number later:
%d\n", pc.activeConsuming)
}()
RETRY:
// more consuming happens here.
```
Later in consumer shutdown, do something like
```
func (pc *pushConsumer) Shutdown() error {
var err error
pc.closeOnce.Do(func() {
close(pc.done)
pc.client.UnregisterConsumer(pc.consumerGroup)
err = pc.defaultConsumer.shutdown()
// now wait until all processing consumer finishing
ctx, cancel := context.WithTimeout(context.Background(),
15*time.Second)
defer cancel()
for {
if pc.activeConsuming == 0 {
return
}
select {
case <-ctx.Done():
return
default:
// do nothing
}
}
})
return err
}
```
It will only return until consumer's all active consuming has finished
processing.
The idea is to make the consumer shutdown gracefully work like [Golang http
server shutdown](https://golang.org/pkg/net/http/#Server.Shutdown).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]