This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 7308bc9  [ISSUE #354] feat: Support PanicHandler (#355)
7308bc9 is described below

commit 7308bc94369320195652243059f63c71bfafc74b
Author: xujianhai666 <[email protected]>
AuthorDate: Tue Jan 7 20:25:44 2020 +0800

    [ISSUE #354] feat: Support PanicHandler (#355)
    
    * feat: Support PanicHandler
    
    Closes #354
---
 consumer/push_consumer.go        | 23 +++++++++++------------
 consumer/statistics.go           | 25 +++++++++++++------------
 errors.go                        |  1 -
 internal/client.go               | 17 ++++++++---------
 internal/remote/remote_client.go | 16 ++++++++++------
 internal/trace.go                | 16 ++++++++++++----
 internal/utils/fun.go            | 18 ------------------
 primitive/base.go                | 15 +++++++++++++++
 producer/producer.go             |  4 +++-
 9 files changed, 72 insertions(+), 63 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 0c7f224..731eb9d 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -66,7 +66,6 @@ type pushConsumer struct {
        subscribedTopic              map[string]string
        interceptor                  primitive.Interceptor
        queueLock                    *QueueLock
-       lockTicker                   *time.Ticker
        done                         chan struct{}
        closeOnce                    sync.Once
 }
@@ -107,7 +106,6 @@ func NewPushConsumer(opts ...Option) (*pushConsumer, error) 
{
                defaultConsumer: dc,
                subscribedTopic: make(map[string]string, 0),
                queueLock:       newQueueLock(),
-               lockTicker:      
time.NewTicker(dc.option.RebalanceLockInterval),
                done:            make(chan struct{}, 1),
                consumeFunc:     utils.NewSet(),
        }
@@ -168,14 +166,16 @@ func (pc *pushConsumer) Start() error {
                pc.Rebalance()
                time.Sleep(1 * time.Second)
 
-               go func() {
+               go primitive.WithRecover(func() {
                        // initial lock.
                        time.Sleep(1000 * time.Millisecond)
                        pc.lockAll()
 
+                       lockTicker := 
time.NewTicker(pc.option.RebalanceLockInterval)
+                       defer lockTicker.Stop()
                        for {
                                select {
-                               case <-pc.lockTicker.C:
+                               case <-lockTicker.C:
                                        pc.lockAll()
                                case <-pc.done:
                                        rlog.Info("push consumer close tick.", 
map[string]interface{}{
@@ -184,7 +184,7 @@ func (pc *pushConsumer) Start() error {
                                        return
                                }
                        }
-               }()
+               })
        })
 
        if err != nil {
@@ -209,7 +209,6 @@ func (pc *pushConsumer) Start() error {
 func (pc *pushConsumer) Shutdown() error {
        var err error
        pc.closeOnce.Do(func() {
-               pc.lockTicker.Stop()
                close(pc.done)
 
                pc.client.UnregisterConsumer(pc.consumerGroup)
@@ -438,7 +437,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
        })
        var sleepTime time.Duration
        pq := request.pq
-       go func() {
+       go primitive.WithRecover(func() {
                for {
                        select {
                        case <-pc.done:
@@ -450,7 +449,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
                                pc.submitToConsume(request.pq, request.mq)
                        }
                }
-       }()
+       })
 
        for {
        NEXT:
@@ -683,13 +682,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) 
{
                        })
                        request.nextOffset = result.NextBeginOffset
                        pq.WithDropped(true)
-                       go func() {
+                       go primitive.WithRecover(func() {
                                time.Sleep(10 * time.Second)
                                pc.storage.update(request.mq, 
request.nextOffset, false)
                                
pc.storage.persist([]*primitive.MessageQueue{request.mq})
                                pc.storage.remove(request.mq)
                                rlog.Warning(fmt.Sprintf("fix the pull request 
offset: %s", request.String()), nil)
-                       }()
+                       })
                default:
                        rlog.Warning(fmt.Sprintf("unknown pull status: %v", 
result.Status), nil)
                        sleepTime = _PullDelayTimeWhenError
@@ -866,7 +865,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                        subMsgs = msgs[count:next]
                        count = next - 1
                }
-               go func() {
+               go primitive.WithRecover(func() {
                RETRY:
                        if pq.IsDroppd() {
                                rlog.Info("the message queue not be able to 
consume, because it was dropped", map[string]interface{}{
@@ -948,7 +947,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq 
*processQueue, mq *primitive.
                                        "message":               msgs,
                                })
                        }
-               }()
+               })
        }
 }
 
diff --git a/consumer/statistics.go b/consumer/statistics.go
index a58ff9e..fdc6379 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -24,6 +24,7 @@ import (
        "sync/atomic"
        "time"
 
+       "github.com/apache/rocketmq-client-go/primitive"
        "github.com/apache/rocketmq-client-go/rlog"
 )
 
@@ -148,7 +149,7 @@ func newStatsItemSet(statsName string) *statsItemSet {
 }
 
 func (sis *statsItemSet) init() {
-       go func() {
+       go primitive.WithRecover(func() {
                ticker := time.NewTicker(10 * time.Second)
                defer ticker.Stop()
                for {
@@ -160,9 +161,9 @@ func (sis *statsItemSet) init() {
 
                        }
                }
-       }()
+       })
 
-       go func() {
+       go primitive.WithRecover(func() {
                ticker := time.NewTicker(10 * time.Minute)
                defer ticker.Stop()
                for {
@@ -173,9 +174,9 @@ func (sis *statsItemSet) init() {
                                sis.samplingInMinutes()
                        }
                }
-       }()
+       })
 
-       go func() {
+       go primitive.WithRecover(func() {
                ticker := time.NewTicker(time.Hour)
                defer ticker.Stop()
                for {
@@ -186,9 +187,9 @@ func (sis *statsItemSet) init() {
                                sis.samplingInHour()
                        }
                }
-       }()
+       })
 
-       go func() {
+       go primitive.WithRecover(func() {
                time.Sleep(nextMinutesTime().Sub(time.Now()))
                ticker := time.NewTicker(time.Minute)
                defer ticker.Stop()
@@ -200,9 +201,9 @@ func (sis *statsItemSet) init() {
                                sis.printAtMinutes()
                        }
                }
-       }()
+       })
 
-       go func() {
+       go primitive.WithRecover(func() {
                time.Sleep(nextHourTime().Sub(time.Now()))
                ticker := time.NewTicker(time.Hour)
                defer ticker.Stop()
@@ -214,9 +215,9 @@ func (sis *statsItemSet) init() {
                                sis.printAtHour()
                        }
                }
-       }()
+       })
 
-       go func() {
+       go primitive.WithRecover(func() {
                time.Sleep(nextMonthTime().Sub(time.Now()))
                ticker := time.NewTicker(24 * time.Hour)
                defer ticker.Stop()
@@ -228,7 +229,7 @@ func (sis *statsItemSet) init() {
                                sis.printAtDay()
                        }
                }
-       }()
+       })
 }
 
 func (sis *statsItemSet) samplingInSeconds() {
diff --git a/errors.go b/errors.go
index 6a774aa..fe9ba33 100644
--- a/errors.go
+++ b/errors.go
@@ -22,7 +22,6 @@ import (
 )
 
 var (
-       // ErrRequestTimeout for request timeout error
        ErrRequestTimeout = errors.New("request timeout")
        ErrMQEmpty        = errors.New("MessageQueue is nil")
        ErrOffset         = errors.New("offset < 0")
diff --git a/internal/client.go b/internal/client.go
index ca8cc88..86a02cb 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -282,7 +282,7 @@ func (c *rmqClient) Start() {
                }
 
                // schedule update route info
-               go func() {
+               go primitive.WithRecover(func() {
                        // delay
                        ticker := time.NewTicker(_PullNameServerInterval)
                        defer ticker.Stop()
@@ -298,10 +298,9 @@ func (c *rmqClient) Start() {
                                        return
                                }
                        }
-               }()
+               })
 
-               // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-               go func() {
+               go primitive.WithRecover(func() {
                        ticker := time.NewTicker(_HeartbeatBrokerInterval)
                        defer ticker.Stop()
                        for {
@@ -316,10 +315,10 @@ func (c *rmqClient) Start() {
                                        return
                                }
                        }
-               }()
+               })
 
                // schedule persist offset
-               go func() {
+               go primitive.WithRecover(func() {
                        ticker := time.NewTicker(_PersistOffsetInterval)
                        defer ticker.Stop()
                        for {
@@ -342,9 +341,9 @@ func (c *rmqClient) Start() {
                                        return
                                }
                        }
-               }()
+               })
 
-               go func() {
+               go primitive.WithRecover(func() {
                        ticker := time.NewTicker(_RebalanceInterval)
                        defer ticker.Stop()
                        for {
@@ -358,7 +357,7 @@ func (c *rmqClient) Start() {
                                        return
                                }
                        }
-               }()
+               })
        })
 }
 
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 3d2bf7f..58abd3a 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -95,7 +95,9 @@ func (c *remotingClient) InvokeAsync(ctx context.Context, 
addr string, request *
        if err != nil {
                return err
        }
-       go c.receiveAsync(resp)
+       go primitive.WithRecover(func() {
+               c.receiveAsync(resp)
+       })
        return nil
 }
 
@@ -127,7 +129,9 @@ func (c *remotingClient) connect(ctx context.Context, addr 
string) (*tcpConnWrap
                return nil, err
        }
        c.connectionTable.Store(addr, tcpConn)
-       go c.receiveResponse(tcpConn)
+       go primitive.WithRecover(func() {
+               c.receiveResponse(tcpConn)
+       })
        return tcpConn, nil
 }
 
@@ -196,20 +200,20 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, 
r *tcpConnWrapper) {
                if exist {
                        c.responseTable.Delete(cmd.Opaque)
                        responseFuture := resp.(*ResponseFuture)
-                       go func() {
+                       go primitive.WithRecover(func() {
                                responseFuture.ResponseCommand = cmd
                                responseFuture.executeInvokeCallback()
                                if responseFuture.Done != nil {
                                        responseFuture.Done <- true
                                }
-                       }()
+                       })
                }
        } else {
                f := c.processors[cmd.Code]
                if f != nil {
                        // single goroutine will be deadlock
                        // TODO: optimize with goroutine pool, 
https://github.com/apache/rocketmq-client-go/issues/307
-                       go func() {
+                       go primitive.WithRecover(func() {
                                res := f(cmd, r.RemoteAddr())
                                if res != nil {
                                        res.Opaque = cmd.Opaque
@@ -222,7 +226,7 @@ func (c *remotingClient) processCMD(cmd *RemotingCommand, r 
*tcpConnWrapper) {
                                                })
                                        }
                                }
-                       }()
+                       })
                } else {
                        rlog.Warning("receive broker's requests, but no func to 
handle", map[string]interface{}{
                                "responseCode": cmd.Code,
diff --git a/internal/trace.go b/internal/trace.go
index 212ee60..9549bba 100644
--- a/internal/trace.go
+++ b/internal/trace.go
@@ -257,7 +257,9 @@ func (td *traceDispatcher) GetTraceTopicName() string {
 func (td *traceDispatcher) Start() {
        td.running = true
        td.cli.Start()
-       go td.process()
+       go primitive.WithRecover(func() {
+               td.process()
+       })
 }
 
 func (td *traceDispatcher) Close() {
@@ -299,7 +301,9 @@ func (td *traceDispatcher) process() {
                        batch = append(batch, ctx)
                        if count == batchSize {
                                count = 0
-                               go td.batchCommit(batch)
+                               go primitive.WithRecover(func() {
+                                       td.batchCommit(batch)
+                               })
                                batch = make([]TraceContext, 0)
                        }
                case <-td.ticker.C:
@@ -308,12 +312,16 @@ func (td *traceDispatcher) process() {
                                count++
                                lastput = time.Now()
                                if len(batch) > 0 {
-                                       go td.batchCommit(batch)
+                                       go primitive.WithRecover(func() {
+                                               td.batchCommit(batch)
+                                       })
                                        batch = make([]TraceContext, 0)
                                }
                        }
                case <-td.ctx.Done():
-                       go td.batchCommit(batch)
+                       go primitive.WithRecover(func() {
+                               td.batchCommit(batch)
+                       })
                        batch = make([]TraceContext, 0)
 
                        now := time.Now().UnixNano() / int64(time.Millisecond)
diff --git a/internal/utils/fun.go b/internal/utils/fun.go
deleted file mode 100644
index 4e9e4ea..0000000
--- a/internal/utils/fun.go
+++ /dev/null
@@ -1,18 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package utils
diff --git a/primitive/base.go b/primitive/base.go
index ae0f06f..efc48ef 100644
--- a/primitive/base.go
+++ b/primitive/base.go
@@ -80,3 +80,18 @@ func verifyIP(ip string) error {
        }
        return nil
 }
+
+var PanicHandler func(interface{})
+
+func WithRecover(fn func()) {
+       defer func() {
+               handler := PanicHandler
+               if handler != nil {
+                       if err := recover(); err != nil {
+                               handler(err)
+                       }
+               }
+       }()
+
+       fn()
+}
diff --git a/producer/producer.go b/producer/producer.go
index a47d76f..0762762 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -411,7 +411,9 @@ func NewTransactionProducer(listener 
primitive.TransactionListener, opts ...Opti
 }
 
 func (tp *transactionProducer) Start() error {
-       go tp.checkTransactionState()
+       go primitive.WithRecover(func() {
+               tp.checkTransactionState()
+       })
        return tp.producer.Start()
 }
 func (tp *transactionProducer) Shutdown() error {

Reply via email to