smiletrl opened a new issue #672:
URL: https://github.com/apache/rocketmq-client-go/issues/672


   **BUG REPORT**  
   
   1. Please describe the issue you observed:
   
       - What did you do (The steps to reproduce)?
   
   Shutdown the consumer in main groutine (i.e, main.go) when this main 
goroutine is terminated by system. In our case, the go application is shutdown 
by kubernetes pod. Code looks like 
   
   ```
        defer func() {
                consumers := orderMessage.Consumers()
                for _, consumer := range consumers {
                        if consumer != nil {
                                err = consumer.Shutdown()
                                    // log the err
                                    fmt.Println("consumer has shutdown")
                        }
                }
        }()
        ch := make(chan os.Signal)
           // k8s will send signal syscall.SIGTERM to the thread
        signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
           // Wait until k8s has shutdown the pod.
        select {
        case <-ch:
                   // panic will invoke the defer func list.
                   panic("gracefully shutdown")
        }
   ```
   
   One order message consumer looks like
   
   ```
        err = pconsumer.Subscribe(constants.RocketMQTopic, selector, func(ctx 
context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) 
{
                // print the subscribe start message
                fmt.Println("subscribe start")
   
                // mock the time consuming business task
                time.Sleep(5 * time.Second)
   
                // print the subscribe complete message
                key := msgs[0].GetKeys()
                fmt.Printf("subscribe complete: tag: %+v, key: %+v, and 
resonsume times: %d, and time %s\n\n", string(tag), key, 
msgs[0].ReconsumeTimes, time.Now())
                return consumer.ConsumeSuccess, nil
        })
   ```
   
   Produce  the message in a test endpoint (e.,g `GET /api/produce-message `), 
code looks like
   
   ```
   res, err := paidProducer.SendMessageInTransaction(ctx, msg)
   ```
   
   Then do the first test:
   
   - Run the application in one terminal;
   - Run `curl  http://127.0.0.1:1234/api/produce-message` in a second terminal 
to produce message;
   - The first terminal screen will print first message `subscribe start`
   - Wait for 5 seconds, it will print second message `subscribe complete:...`
   - Enter the `Control + C` in the first screen.
   - It will print message `consumer has shutdown`.
   
   Do the second Test:
   
   - Run the application in one terminal;
   - Run `curl  http://127.0.0.1:1234/api/produce-message` in a second terminal 
to produce message;
   - The first terminal screen will print first message `subscribe start`,
   - Enter the `Control + C` in the first screen.
   - It will print message `consumer has shutdown`.
   
   We don't see the message `subscribe complete:...` in the second test. It 
means the consumer has started consuming the message, but doesn't finish it yet 
when the application has exited.
   
       - What did you expect to see?
   I would expect the second test to print `subscribe complete:...` before 
printing `consumer has shutdown`. In that case, it means consumer has finished 
all current active consumings before program exits.
   
   In our kubernetes deployment, k8s could terminate pods easily, and we 
certainly don't want the consumer breaks in the middle of processing.
   
       - What did you see instead?
   In the second test, it doesn't print message `subscribe complete:...` 
Program has exited before consumer is finished.
   
   2. Please tell us about your environment:
   
        - What is your OS?
   <details><summary><code>go env</code> Output</summary><br><pre>
   smiletrl@Rulins-MacBook-Pro yuanyi-api % go env
   GO111MODULE="auto"
   GOARCH="amd64"
   GOBIN=""
   GOCACHE="/Users/smiletrl/Library/Caches/go-build"
   GOENV="/Users/smiletrl/Library/Application Support/go/env"
   GOEXE=""
   GOFLAGS=""
   GOHOSTARCH="amd64"
   GOHOSTOS="darwin"
   GOINSECURE=""
   GOMODCACHE="/Users/smiletrl/go/pkg/mod"
   GONOPROXY=""
   GONOSUMDB=""
   GOOS="darwin"
   GOPATH="/Users/smiletrl/go"
   GOPRIVATE=""
   GOPROXY="https://goproxy.cn,direct";
   GOROOT="/usr/local/Cellar/go/1.16.2/libexec"
   GOSUMDB="sum.golang.org"
   GOTMPDIR=""
   GOTOOLDIR="/usr/local/Cellar/go/1.16.2/libexec/pkg/tool/darwin_amd64"
   GOVCS=""
   GOVERSION="go1.16.2"
   GCCGO="gccgo"
   AR="ar"
   CC="clang"
   CXX="clang++"
   CGO_ENABLED="1"
   GOMOD="/Users/smiletrl/go/src/github.com/digitalyl/yuanyi-api/go.mod"
   CGO_CFLAGS="-g -O2"
   CGO_CPPFLAGS=""
   CGO_CXXFLAGS="-g -O2"
   CGO_FFLAGS="-g -O2"
   CGO_LDFLAGS="-g -O2"
   PKG_CONFIG="pkg-config"
   GOGCCFLAGS="-fPIC -arch x86_64 -m64 -pthread -fno-caret-diagnostics 
-Qunused-arguments -fmessage-length=0 
-fdebug-prefix-map=/var/folders/sq/2123x_0d19s90tx1q2wpgtxr0000gn/T/go-build4266147259=/tmp/go-build
 -gno-record-gcc-switches -fno-common"
   </pre></details>
   
        - What is your client version?
       
   V4_8_0
   
        - What is your RocketMQ version?
   
   V4_8_0
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions on how to fix, etc):
   
   One workaround is to add time sleep after consumer is shutdown. So it could 
wait until all current active consuming has finished.
   
   A better solution is to only finish consumer shutdown after its active 
consuming has finished. We could add a property to `pushConsumer` struct, like
   
   ```
   type pushConsumer struct {
        *defaultConsumer
        queueFlowControlTimes        int
        queueMaxSpanFlowControlTimes int
        consumeFunc                  utils.Set
        submitToConsume              func(*processQueue, 
*primitive.MessageQueue)
        subscribedTopic              map[string]string
        interceptor                  primitive.Interceptor
        queueLock                    *QueueLock
        done                         chan struct{}
        closeOnce                    sync.Once
        activeConsuming              int // this is the new property to log the 
active consuming.
   }
   ```
   
   Then in the consume message call, increase the log before it starts and 
decrease the log after it finishes. Something like
   
   ```
   func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq 
*primitive.MessageQueue) {
        msgs := pq.getMessages()
        if msgs == nil {
                return
        }
        for count := 0; count < len(msgs); count++ {
                var subMsgs []*primitive.MessageExt
                if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
                        subMsgs = msgs[count:]
                        count = len(msgs)
                } else {
                        next := count + pc.option.ConsumeMessageBatchMaxSize
                        subMsgs = msgs[count:next]
                        count = next - 1
                }
                go primitive.WithRecover(func() {
                        pc.activeConsuming++
                        fmt.Printf("pc active comsuming number: %d\n", 
pc.activeConsuming)
                        defer func() {
                                pc.activeConsuming--
                                fmt.Printf("pc active comsuming number later: 
%d\n", pc.activeConsuming)
                        }()
                RETRY:
                            // more consuming happens here.
   ```
   
   Later in consumer shutdown, do something like
   
   ```
   func (pc *pushConsumer) Shutdown() error {
        var err error
        pc.closeOnce.Do(func() {
                close(pc.done)
   
                pc.client.UnregisterConsumer(pc.consumerGroup)
                err = pc.defaultConsumer.shutdown()
   
                // now wait until all processing consumer finishing
                ctx, cancel := context.WithTimeout(context.Background(), 
15*time.Second)
                defer cancel()
                for {
                        if pc.activeConsuming == 0 {
                                return
                        }
                        select {
                        case <-ctx.Done():
                                return
                        default:
                                // do nothing
                        }
                }
        })
   
        return err
   }
   ```
   It will only return until consumer's all active consuming has finished 
processing.
   
   The idea is to make the consumer shutdown gracefully work like [Golang http 
server shutdown](https://golang.org/pkg/net/http/#Server.Shutdown).
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to