little-cui closed pull request #325: SCB-472 Null point reference in zipkin 
plugin
URL: https://github.com/apache/incubator-servicecomb-service-center/pull/325
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index 02786d95..d468dbe2 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -108,6 +108,12 @@ log_format = text
 # whether enable record syslog
 log_sys = false
 
+###################################################################
+# Frontend Configurations
+###################################################################
+frontend_host_ip=127.0.0.1
+frontend_host_port=30103
+
 ###################################################################
 # above is the global configurations
 # you can overide above configuration in specific env
@@ -120,9 +126,3 @@ logfile = ./service-center.log
 [dev]
 loglevel = DEBUG
 logfile = ""
-
-###################################################################
-# Frontend Configurations
-###################################################################
-frontend_host_ip=127.0.0.1
-frontend_host_port=30103
diff --git a/main.go b/main.go
index d76e34cb..3d7ac976 100644
--- a/main.go
+++ b/main.go
@@ -18,8 +18,18 @@ package main
 
 // plugins
 import _ 
"github.com/apache/incubator-servicecomb-service-center/server/bootstrap"
-import "github.com/apache/incubator-servicecomb-service-center/server"
+import (
+       "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "github.com/apache/incubator-servicecomb-service-center/server"
+       
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+)
 
 func main() {
        server.Run()
+
+       util.GoCloseAndWait()
+
+       backend.Registry().Close()
+
+       util.Logger().Warn("service center exited", nil)
 }
diff --git a/pkg/async/async_task.go b/pkg/async/async_task.go
index 14727a17..73ceb384 100644
--- a/pkg/async/async_task.go
+++ b/pkg/async/async_task.go
@@ -39,6 +39,7 @@ type scheduler struct {
        queue      *util.UniQueue
        latestTask AsyncTask
        once       sync.Once
+       goroutine  *util.GoRoutine
 }
 
 func (s *scheduler) AddTask(ctx context.Context, task AsyncTask) (err error) {
@@ -47,7 +48,7 @@ func (s *scheduler) AddTask(ctx context.Context, task 
AsyncTask) (err error) {
        }
 
        s.once.Do(func() {
-               go s.do()
+               s.goroutine.Do(s.do)
        })
 
        err = s.queue.Put(ctx, task)
@@ -57,15 +58,17 @@ func (s *scheduler) AddTask(ctx context.Context, task 
AsyncTask) (err error) {
        return s.latestTask.Err()
 }
 
-func (s *scheduler) do() {
+func (s *scheduler) do(ctx context.Context) {
        for {
                select {
+               case <-ctx.Done():
+                       return
                case task, ok := <-s.queue.Chan():
                        if !ok {
                                return
                        }
                        at := task.(AsyncTask)
-                       at.Do(context.Background())
+                       at.Do(ctx)
                        s.latestTask = at
                }
        }
@@ -73,6 +76,15 @@ func (s *scheduler) do() {
 
 func (s *scheduler) Close() {
        s.queue.Close()
+       s.goroutine.Close(true)
+}
+
+func newScheduler(task AsyncTask) *scheduler {
+       return &scheduler{
+               queue:      util.NewUniQueue(),
+               latestTask: task,
+               goroutine:  util.NewGo(context.Background()),
+       }
 }
 
 type AsyncTaskService struct {
@@ -99,10 +111,7 @@ func (lat *AsyncTaskService) getOrNewScheduler(task 
AsyncTask) (s *scheduler, is
                s, ok = lat.schedules[key]
                if !ok {
                        isNew = true
-                       s = &scheduler{
-                               queue:      util.NewUniQueue(),
-                               latestTask: task,
-                       }
+                       s = newScheduler(task)
                        lat.schedules[key] = s
                }
                lat.lock.Unlock()
@@ -166,11 +175,11 @@ func (lat *AsyncTaskService) LatestHandled(key string) 
(AsyncTask, error) {
        return s.latestTask, nil
 }
 
-func (lat *AsyncTaskService) daemon(stopCh <-chan struct{}) {
+func (lat *AsyncTaskService) daemon(ctx context.Context) {
        util.SafeCloseChan(lat.ready)
        for {
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                        util.Logger().Debugf("daemon thread exited for 
AsyncTaskService is stopped")
                        return
                case <-time.After(DEFAULT_REMOVE_TASKS_INTERVAL):
@@ -228,7 +237,7 @@ func NewAsyncTaskService() *AsyncTaskService {
        return &AsyncTaskService{
                schedules:   make(map[string]*scheduler, 
DEFAULT_MAX_SCHEDULE_COUNT),
                removeTasks: make(map[string]struct{}, 
DEFAULT_MAX_SCHEDULE_COUNT),
-               goroutine:   util.NewGo(make(chan struct{})),
+               goroutine:   util.NewGo(context.Background()),
                ready:       make(chan struct{}),
                isClose:     true,
        }
diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go
index 9b2e7fa2..36577402 100644
--- a/pkg/chain/callback.go
+++ b/pkg/chain/callback.go
@@ -47,11 +47,7 @@ func (cb *Callback) Invoke(r Result) {
 }
 
 func syncInvoke(f func(r Result), r Result) {
-       defer func() {
-               if itf := recover(); itf != nil {
-                       util.LogPanic(itf)
-               }
-       }()
+       defer util.RecoverAndReport()
        if f == nil {
                util.Logger().Errorf(nil, "Callback function is nil. result: 
%s,", r)
                return
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index f54ef6f5..d1a35b94 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -134,7 +134,8 @@ func (m *DLock) Lock(wait bool) error {
                util.Logger().Warnf(err, "Key %s is locked, waiting for other 
node releases it, id=%s", m.builder.key, m.id)
 
                ctx, cancel := context.WithTimeout(m.builder.ctx, 
DEFAULT_LOCK_TTL*time.Second)
-               go func() {
+               util.Go(func(context.Context) {
+                       defer cancel()
                        err := backend.Registry().Watch(ctx,
                                registry.WithStrKey(m.builder.key),
                                registry.WithWatchCallback(
@@ -146,10 +147,9 @@ func (m *DLock) Lock(wait bool) error {
                                                return nil
                                        }))
                        if err != nil {
-                               util.Logger().Errorf(nil, "%s, key=%s, id=%s", 
err.Error(), m.builder.key, m.id)
+                               util.Logger().Warnf(nil, "%s, key=%s, id=%s", 
err.Error(), m.builder.key, m.id)
                        }
-                       cancel()
-               }()
+               })
                select {
                case <-ctx.Done():
                        continue // 可以重新尝试获取锁
diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go
index fef6cafd..3ec7cd35 100644
--- a/pkg/grace/grace.go
+++ b/pkg/grace/grace.go
@@ -20,6 +20,7 @@ import (
        "flag"
        "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "golang.org/x/net/context"
        "os"
        "os/exec"
        "os/signal"
@@ -71,7 +72,7 @@ func Init() {
                flag.Parse()
        }
 
-       go handleSignals()
+       util.Go(handleSignals)
 }
 
 func Before(f func()) {
@@ -111,26 +112,28 @@ func fireSignalHook(ppFlag int, sig os.Signal) {
        }
 }
 
-func handleSignals() {
+func handleSignals(ctx context.Context) {
        var sig os.Signal
 
        sigCh := make(chan os.Signal)
        signal.Notify(sigCh, registerSignals...)
 
        for {
-               sig = <-sigCh
-               fireSignalHook(PreSignal, sig)
-               switch sig {
-               case syscall.SIGHUP:
-                       util.Logger().Debugf("received signal 'SIGHUP', now 
forking")
-                       err := fork()
-                       if err != nil {
-                               util.Logger().Errorf(err, "fork a process 
failed")
+               select {
+               case <-ctx.Done():
+                       return
+               case sig = <-sigCh:
+                       fireSignalHook(PreSignal, sig)
+                       switch sig {
+                       case syscall.SIGHUP:
+                               util.Logger().Debugf("received signal '%v', now 
forking", sig)
+                               err := fork()
+                               if err != nil {
+                                       util.Logger().Errorf(err, "fork a 
process failed")
+                               }
                        }
-               default:
-                       util.Logger().Warnf(nil, "received signal '%v'", sig)
+                       fireSignalHook(PostSignal, sig)
                }
-               fireSignalHook(PostSignal, sig)
        }
 }
 
diff --git a/pkg/util/goroutines.go b/pkg/util/goroutines.go
index a021f52c..bb9b9d99 100644
--- a/pkg/util/goroutines.go
+++ b/pkg/util/goroutines.go
@@ -16,42 +16,37 @@
  */
 package util
 
-import "sync"
+import (
+       "golang.org/x/net/context"
+       "sync"
+)
 
 type GoRoutine struct {
-       stopCh chan struct{}
+       ctx    context.Context
+       cancel context.CancelFunc
        wg     sync.WaitGroup
        mux    sync.RWMutex
-       once   sync.Once
        closed bool
 }
 
-func (g *GoRoutine) Init(stopCh chan struct{}) {
-       g.once.Do(func() {
-               g.stopCh = stopCh
-       })
-}
-
-func (g *GoRoutine) StopCh() <-chan struct{} {
-       return g.stopCh
-}
-
-func (g *GoRoutine) Do(f func(<-chan struct{})) {
+func (g *GoRoutine) Do(f func(context.Context)) {
        g.wg.Add(1)
        go func() {
                defer g.wg.Done()
-               f(g.StopCh())
+               defer RecoverAndReport()
+               f(g.ctx)
        }()
 }
 
 func (g *GoRoutine) Close(wait bool) {
        g.mux.Lock()
        defer g.mux.Unlock()
+
        if g.closed {
                return
        }
        g.closed = true
-       close(g.stopCh)
+       g.cancel()
        if wait {
                g.Wait()
        }
@@ -61,27 +56,26 @@ func (g *GoRoutine) Wait() {
        g.wg.Wait()
 }
 
-var defaultGo GoRoutine
+var defaultGo *GoRoutine
 
 func init() {
-       GoInit()
+       defaultGo = NewGo(context.Background())
 }
 
-func Go(f func(<-chan struct{})) {
+func Go(f func(context.Context)) {
        defaultGo.Do(f)
 }
 
-func GoInit() {
-       defaultGo.Init(make(chan struct{}))
-}
-
 func GoCloseAndWait() {
        defaultGo.Close(true)
-       Logger().Debugf("all goroutines quit normally")
+       Logger().Debugf("all goroutines exited")
 }
 
-func NewGo(stopCh chan struct{}) *GoRoutine {
-       gr := &GoRoutine{}
-       gr.Init(stopCh)
+func NewGo(ctx context.Context) *GoRoutine {
+       ctx, cancel := context.WithCancel(ctx)
+       gr := &GoRoutine{
+               ctx:    ctx,
+               cancel: cancel,
+       }
        return gr
 }
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index cfc09191..d8088142 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -18,72 +18,66 @@ package util
 
 import (
        "fmt"
+       "golang.org/x/net/context"
        "sync"
        "testing"
        "time"
 )
 
-func TestGoRoutine_Init(t *testing.T) {
-       var test GoRoutine
-       stopCh1 := make(chan struct{})
-       defer close(stopCh1)
-       stopCh2 := make(chan struct{})
-       defer close(stopCh2)
-
-       test.Init(stopCh1)
-       c := test.StopCh()
-       if c != stopCh1 {
-               fail(t, "init GoRoutine failed.")
-       }
-
-       test.Init(stopCh2)
-       c = test.StopCh()
-       if c == stopCh2 {
-               fail(t, "init GoRoutine twice.")
-       }
-}
-
 func TestGoRoutine_Do(t *testing.T) {
-       var test1 GoRoutine
-       stopCh := make(chan struct{})
-       test1.Init(make(chan struct{}))
-       test1.Do(func(neverStopCh <-chan struct{}) {
-               defer close(stopCh)
+       test1 := NewGo(context.Background())
+       defer test1.Close(true)
+       stopCh1 := make(chan struct{})
+       test1.Do(func(ctx context.Context) {
+               defer close(stopCh1)
                select {
-               case <-neverStopCh:
-                       fail(t, "neverStopCh should not be closed.")
+               case <-ctx.Done():
+                       fail(t, "ctx should not be done.")
                case <-time.After(time.Second):
                }
        })
-       <-stopCh
+       <-stopCh1
 
-       var test2 GoRoutine
-       stopCh1 := make(chan struct{})
+       ctx, cancel := context.WithCancel(context.Background())
+       test2 := NewGo(ctx)
+       defer test2.Close(true)
        stopCh2 := make(chan struct{})
-       test2.Init(stopCh1)
-       test2.Do(func(stopCh <-chan struct{}) {
+       test2.Do(func(ctx context.Context) {
                defer close(stopCh2)
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                case <-time.After(time.Second):
-                       fail(t, "time out to wait stopCh1 close.")
+                       fail(t, "time out to wait stopCh2 close.")
                }
        })
-       close(stopCh1)
+       cancel()
        <-stopCh2
+
+       ctx, _ = context.WithTimeout(context.Background(), 0)
+       test3 := NewGo(ctx)
+       defer test3.Close(true)
+       stopCh3 := make(chan struct{})
+       test3.Do(func(ctx context.Context) {
+               defer close(stopCh3)
+               select {
+               case <-ctx.Done():
+               case <-time.After(time.Second):
+                       fail(t, "time out to wait ctx done.")
+               }
+       })
+       <-stopCh3
 }
 
 func TestGoRoutine_Wait(t *testing.T) {
-       var test GoRoutine
        var mux sync.Mutex
        MAX := 10
        resultArr := make([]int, 0, MAX)
-       test.Init(make(chan struct{}))
+       test := NewGo(context.Background())
        for i := 0; i < MAX; i++ {
                func(i int) {
-                       test.Do(func(neverStopCh <-chan struct{}) {
+                       test.Do(func(ctx context.Context) {
                                select {
-                               case <-neverStopCh:
+                               case <-ctx.Done():
                                case <-time.After(time.Second):
                                        mux.Lock()
                                        resultArr = append(resultArr, i)
@@ -103,13 +97,12 @@ func TestGoRoutine_Wait(t *testing.T) {
 }
 
 func TestGoRoutine_Close(t *testing.T) {
-       var test GoRoutine
-       test.Init(make(chan struct{}))
-       test.Do(func(stopCh <-chan struct{}) {
+       test := NewGo(context.Background())
+       test.Do(func(ctx context.Context) {
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                case <-time.After(time.Second):
-                       fail(t, "time out to wait stopCh close.")
+                       fail(t, "time out to wait ctx close.")
                }
        })
        test.Close(true)
@@ -117,20 +110,18 @@ func TestGoRoutine_Close(t *testing.T) {
 }
 
 func TestGo(t *testing.T) {
-       GoInit()
-       Go(func(stopCh <-chan struct{}) {
+       Go(func(ctx context.Context) {
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case <-time.After(time.Second):
                        }
                }
        })
+       Go(func(ctx context.Context) {
+               var a *int
+               fmt.Println(*a)
+       })
        GoCloseAndWait()
 }
-
-func TestNewGo(t *testing.T) {
-       g := NewGo(make(chan struct{}))
-       defer g.Close(true)
-}
diff --git a/pkg/util/log.go b/pkg/util/log.go
index 5e475a7b..bc82a899 100644
--- a/pkg/util/log.go
+++ b/pkg/util/log.go
@@ -20,6 +20,7 @@ import (
        "fmt"
        "github.com/ServiceComb/paas-lager"
        
"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
+       "golang.org/x/net/context"
        "os"
        "path/filepath"
        "runtime"
@@ -57,7 +58,7 @@ func init() {
        loggers = make(map[string]lager.Logger, 10)
        loggerNames = make(map[string]string, 10)
        // make LOGGER do not be nil, new a stdout logger
-       LOGGER = newLogger(fromLagerConfig(defaultLagerConfig))
+       LOGGER = NewLogger(fromLagerConfig(defaultLagerConfig))
 }
 
 func fromLagerConfig(c *stlager.Config) LoggerConfig {
@@ -82,7 +83,7 @@ func toLagerConfig(c LoggerConfig) stlager.Config {
 }
 
 // newLog new log, unsafe
-func newLogger(cfg LoggerConfig) lager.Logger {
+func NewLogger(cfg LoggerConfig) lager.Logger {
        stlager.Init(toLagerConfig(cfg))
        return stlager.NewLogger(cfg.LoggerFile)
 }
@@ -93,7 +94,7 @@ func InitGlobalLogger(cfg LoggerConfig) {
                cfg.LoggerLevel = defaultLagerConfig.LoggerLevel
        }
        loggerConfig = cfg
-       LOGGER = newLogger(cfg)
+       LOGGER = NewLogger(cfg)
        // log rotate
        RunLogDirRotate(cfg)
        // recreate the deleted log file
@@ -144,7 +145,7 @@ func Logger() lager.Logger {
                        if len(cfg.LoggerFile) != 0 {
                                cfg.LoggerFile = 
filepath.Join(filepath.Dir(cfg.LoggerFile), logFile+".log")
                        }
-                       logger = newLogger(cfg)
+                       logger = NewLogger(cfg)
                        loggers[logFile] = logger
                        LOGGER.Warnf(nil, "match %s, new logger %s for %s", 
prefix, logFile, funcFullName)
                }
@@ -190,10 +191,10 @@ func monitorLogFile() {
        if len(loggerConfig.LoggerFile) == 0 {
                return
        }
-       Go(func(stopCh <-chan struct{}) {
+       Go(func(ctx context.Context) {
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case <-time.After(time.Minute):
                                Logger().Debug(fmt.Sprintf("Check log file at 
%s", time.Now()))
diff --git a/pkg/util/logrotate.go b/pkg/util/logrotate.go
index db4b79f9..e7dbcefd 100644
--- a/pkg/util/logrotate.go
+++ b/pkg/util/logrotate.go
@@ -19,6 +19,7 @@ package util
 import (
        "archive/zip"
        "fmt"
+       "golang.org/x/net/context"
        "io"
        "os"
        "path/filepath"
@@ -293,10 +294,10 @@ func CopyFile(srcFile, destFile string) error {
 }
 
 func RunLogDirRotate(cfg LoggerConfig) {
-       Go(func(stopCh <-chan struct{}) {
+       Go(func(ctx context.Context) {
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case <-time.After(cfg.LogRotatePeriod):
                                LogRotate(filepath.Dir(cfg.LoggerFile), 
cfg.LogRotateSize, cfg.LogBackupCount)
diff --git a/server/api.go b/server/api.go
index 26ac55a2..f434bdc4 100644
--- a/server/api.go
+++ b/server/api.go
@@ -36,8 +36,9 @@ func init() {
        InitAPI()
 
        apiServer = &APIServer{
-               isClose: true,
-               err:     make(chan error, 1),
+               isClose:   true,
+               err:       make(chan error, 1),
+               goroutine: util.NewGo(context.Background()),
        }
 }
 
@@ -66,6 +67,7 @@ type APIServer struct {
        isClose   bool
        forked    bool
        err       chan error
+       goroutine *util.GoRoutine
 }
 
 const (
@@ -176,16 +178,18 @@ func (s *APIServer) doAPIServerHeartBeat(pCtx 
context.Context) {
 }
 
 func (s *APIServer) startHeartBeatService() {
-       go func() {
+       s.goroutine.Do(func(ctx context.Context) {
                for {
                        select {
+                       case <-ctx.Done():
+                               return
                        case <-s.err:
                                return
                        case 
<-time.After(time.Duration(core.Instance.HealthCheck.Interval) * time.Second):
                                s.doAPIServerHeartBeat(context.Background())
                        }
                }
-       }()
+       })
 }
 
 func (s *APIServer) graceDone() {
@@ -211,14 +215,14 @@ func (s *APIServer) startRESTServer() (err error) {
        }
        util.Logger().Infof("Local listen address: %s, host: %s.", ep, 
s.HostName)
 
-       go func() {
+       s.goroutine.Do(func(_ context.Context) {
                err := s.restSrv.Serve()
                if s.isClose {
                        return
                }
                util.Logger().Errorf(err, "error to start REST API server %s", 
ep)
                s.err <- err
-       }()
+       })
        return
 }
 
@@ -234,14 +238,14 @@ func (s *APIServer) startRPCServer() (err error) {
        }
        util.Logger().Infof("Local listen address: %s, host: %s.", ep, 
s.HostName)
 
-       go func() {
+       s.goroutine.Do(func(_ context.Context) {
                err := s.rpcSrv.Serve()
                if s.isClose {
                        return
                }
                util.Logger().Errorf(err, "error to start RPC API server %s", 
ep)
                s.err <- err
-       }()
+       })
        return
 }
 
@@ -301,6 +305,8 @@ func (s *APIServer) Stop() {
 
        close(s.err)
 
+       s.goroutine.Close(true)
+
        util.Logger().Info("api server stopped.")
 }
 
diff --git a/server/broker/service.go b/server/broker/service.go
index bde8f3e8..3ae63c61 100644
--- a/server/broker/service.go
+++ b/server/broker/service.go
@@ -34,7 +34,7 @@ import (
        "golang.org/x/net/context"
 )
 
-var BrokerServiceAPI *BrokerService = &BrokerService{}
+var BrokerServiceAPI = &BrokerService{}
 
 type BrokerService struct {
 }
diff --git a/server/broker/store.go b/server/broker/store.go
index 0cdddc59..95c61a41 100644
--- a/server/broker/store.go
+++ b/server/broker/store.go
@@ -21,6 +21,7 @@ import (
 
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        sstore 
"github.com/apache/incubator-servicecomb-service-center/server/core/backend/store"
+       "golang.org/x/net/context"
 )
 
 const (
@@ -72,12 +73,16 @@ func (s *BKvStore) newStore(t sstore.StoreType, opts 
...sstore.KvCacherCfgOption
        s.newIndexer(t, sstore.NewKvCacher(opts...))
 }
 
-func (s *BKvStore) store() {
+func (s *BKvStore) store(ctx context.Context) {
        for t := sstore.StoreType(0); t != typeEnd; t++ {
                s.newStore(t)
        }
        for _, i := range s.bindexers {
-               <-i.Ready()
+               select {
+               case <-ctx.Done():
+                       return
+               case <-i.Ready():
+               }
        }
        util.SafeCloseChan(s.bready)
 
@@ -120,7 +125,13 @@ func (s *BKvStore) newIndexer(t sstore.StoreType, cacher 
sstore.Cacher) {
 }
 
 func (s *BKvStore) Run() {
-       go s.store()
+       util.Go(func(ctx context.Context) {
+               s.store(ctx)
+               select {
+               case <-ctx.Done():
+                       s.Stop()
+               }
+       })
 }
 
 func (s *BKvStore) Ready() <-chan struct{} {
@@ -154,3 +165,18 @@ func (s *BKvStore) Verification() *sstore.Indexer {
 func (s *BKvStore) PactLatest() *sstore.Indexer {
        return s.bindexers[PACT_LATEST]
 }
+
+func (s *BKvStore) Stop() {
+       if s.bisClose {
+               return
+       }
+       s.bisClose = true
+
+       for _, i := range s.bindexers {
+               i.Stop()
+       }
+
+       util.SafeCloseChan(s.bready)
+
+       util.Logger().Debugf("broker store daemon stopped")
+}
diff --git a/server/broker/util.go b/server/broker/util.go
index 23e98ca1..895ee8ca 100644
--- a/server/broker/util.go
+++ b/server/broker/util.go
@@ -25,14 +25,16 @@ import (
        "strconv"
        "strings"
 
-       "github.com/ServiceComb/paas-lager"
        
"github.com/ServiceComb/paas-lager/third_party/forked/cloudfoundry/lager"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
-       backend 
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
+       "github.com/apache/incubator-servicecomb-service-center/server/core"
+       
"github.com/apache/incubator-servicecomb-service-center/server/core/backend"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        scerr 
"github.com/apache/incubator-servicecomb-service-center/server/error"
        
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
        serviceUtil 
"github.com/apache/incubator-servicecomb-service-center/server/service/util"
+       "path/filepath"
+       "time"
 )
 
 var PactLogger lager.Logger
@@ -88,12 +90,18 @@ var brokerAPILinksTitles = map[string]string{
 
 func init() {
        //define Broker logger
-       stlager.Init(stlager.Config{
-               LoggerLevel:   "INFO",
-               LoggerFile:    "broker_srvc.log",
-               EnableRsyslog: false,
+       name := ""
+       if len(core.ServerInfo.Config.LogFilePath) != 0 {
+               name = 
filepath.Join(filepath.Dir(core.ServerInfo.Config.LogFilePath), 
"broker_srvc.log")
+       }
+       PactLogger = util.NewLogger(util.LoggerConfig{
+               LoggerLevel:     core.ServerInfo.Config.LogLevel,
+               LoggerFile:      name,
+               LogFormatText:   core.ServerInfo.Config.LogFormat == "text",
+               LogRotatePeriod: 30 * time.Second,
+               LogRotateSize:   int(core.ServerInfo.Config.LogRotateSize),
+               LogBackupCount:  int(core.ServerInfo.Config.LogBackupCount),
        })
-       PactLogger = stlager.NewLogger("broker_srvc")
 }
 
 func GetDefaultTenantProject() string {
diff --git a/server/core/0_init.go b/server/core/0_init.go
index 1b888492..8749af62 100644
--- a/server/core/0_init.go
+++ b/server/core/0_init.go
@@ -87,7 +87,6 @@ func initLogger() {
 }
 
 func handleSignals() {
-       var sig os.Signal
        sigCh := make(chan os.Signal)
        signal.Notify(sigCh,
                syscall.SIGINT,
@@ -95,13 +94,14 @@ func handleSignals() {
                syscall.SIGTERM,
        )
        wait := 5 * time.Second
-       for {
-               sig = <-sigCh
+       for sig := range sigCh {
                switch sig {
                case syscall.SIGINT, syscall.SIGKILL, syscall.SIGTERM:
                        <-time.After(wait)
-                       util.Logger().Warnf(nil, "Waiting for server response 
timed out(%s), force shutdown.", wait)
+                       util.Logger().Warnf(nil, "waiting for server response 
timed out(%s), force shutdown", wait)
                        os.Exit(1)
+               default:
+                       util.Logger().Warnf(nil, "received signal '%v'", sig)
                }
        }
 }
diff --git a/server/core/backend/store/cacher.go 
b/server/core/backend/store/cacher.go
index c6ade974..39ba733f 100644
--- a/server/core/backend/store/cacher.go
+++ b/server/core/backend/store/cacher.go
@@ -169,12 +169,12 @@ type KvCacher struct {
        lastRev         int64
        noEventInterval int
 
-       ready   chan struct{}
-       lw      ListWatcher
-       mux     sync.Mutex
-       once    sync.Once
-       cache   *KvCache
-       goroute *util.GoRoutine
+       ready     chan struct{}
+       lw        ListWatcher
+       mux       sync.Mutex
+       once      sync.Once
+       cache     *KvCache
+       goroutine *util.GoRoutine
 }
 
 func (c *KvCacher) needList() bool {
@@ -267,23 +267,18 @@ func (c *KvCacher) needDeferHandle(evts []*Event) bool {
        return c.Cfg.DeferHandler.OnCondition(c.Cache(), evts)
 }
 
-func (c *KvCacher) refresh(stopCh <-chan struct{}) {
+func (c *KvCacher) refresh(ctx context.Context) {
        util.Logger().Debugf("start to list and watch %s", c.Cfg)
-       ctx, cancel := context.WithCancel(context.Background())
-       c.goroute.Do(func(stopCh <-chan struct{}) {
-               defer cancel()
-               <-stopCh
-       })
        for {
                start := time.Now()
                c.ListAndWatch(ctx)
                watchDuration := time.Since(start)
-               nextPeriod := 0 * time.Second
+               nextPeriod := c.Cfg.Period
                if watchDuration > 0 && c.Cfg.Period > watchDuration {
                        nextPeriod = c.Cfg.Period - watchDuration
                }
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                        util.Logger().Debugf("stop to list and watch %s", c.Cfg)
                        return
                case <-time.After(nextPeriod):
@@ -291,7 +286,7 @@ func (c *KvCacher) refresh(stopCh <-chan struct{}) {
        }
 }
 
-func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
+func (c *KvCacher) deferHandle(ctx context.Context) {
        if c.Cfg.DeferHandler == nil {
                return
        }
@@ -299,7 +294,7 @@ func (c *KvCacher) deferHandle(stopCh <-chan struct{}) {
        i, evts := 0, make([]*Event, event_block_size)
        for {
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                        return
                case evt, ok := <-c.Cfg.DeferHandler.HandleChan():
                        if !ok {
@@ -524,8 +519,8 @@ func (c *KvCacher) onKvEvents(evts []*KvEvent) {
 }
 
 func (c *KvCacher) run() {
-       c.goroute.Do(c.refresh)
-       c.goroute.Do(c.deferHandle)
+       c.goroutine.Do(c.refresh)
+       c.goroutine.Do(c.deferHandle)
 }
 
 func (c *KvCacher) Cache() Cache {
@@ -537,7 +532,7 @@ func (c *KvCacher) Run() {
 }
 
 func (c *KvCacher) Stop() {
-       c.goroute.Close(true)
+       c.goroutine.Close(true)
 
        util.SafeCloseChan(c.ready)
 }
@@ -577,7 +572,7 @@ func NewKvCacher(opts ...KvCacherCfgOption) *KvCacher {
                        Client: backend.Registry(),
                        Key:    cfg.Key,
                },
-               goroute: util.NewGo(make(chan struct{})),
+               goroutine: util.NewGo(context.Background()),
        }
        cacher.cache = NewKvCache(cacher, cfg.InitSize)
        return cacher
diff --git a/server/core/backend/store/defer.go 
b/server/core/backend/store/defer.go
index d35f9734..43464f06 100644
--- a/server/core/backend/store/defer.go
+++ b/server/core/backend/store/defer.go
@@ -21,6 +21,7 @@ import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
        "github.com/coreos/etcd/mvcc/mvccpb"
+       "golang.org/x/net/context"
        "sync"
        "time"
 )
@@ -98,12 +99,12 @@ func (iedh *InstanceEventDeferHandler) HandleChan() <-chan 
*Event {
        return iedh.deferCh
 }
 
-func (iedh *InstanceEventDeferHandler) check(stopCh <-chan struct{}) {
+func (iedh *InstanceEventDeferHandler) check(ctx context.Context) {
        defer util.RecoverAndReport()
        t, n := iedh.newTimer(), false
        for {
                select {
-               case <-stopCh:
+               case <-ctx.Done():
                        return
                case evts := <-iedh.pendingCh:
                        for _, evt := range evts {
@@ -117,7 +118,7 @@ func (iedh *InstanceEventDeferHandler) check(stopCh <-chan 
struct{}) {
                        }
 
                        total := iedh.cache.Size()
-                       if !iedh.enabled && del > 0 && total > 0 && 
float64(del) >= float64(total)*iedh.Percent {
+                       if !iedh.enabled && del > 0 && total > 5 && 
float64(del) >= float64(total)*iedh.Percent {
                                iedh.enabled = true
                                util.Logger().Warnf(nil, "self preservation is 
enabled, caught %d/%d(>=%.0f%%) DELETE events",
                                        del, total, iedh.Percent*100)
diff --git a/server/core/backend/store/indexer.go 
b/server/core/backend/store/indexer.go
index 72360503..5cf3e6dc 100644
--- a/server/core/backend/store/indexer.go
+++ b/server/core/backend/store/indexer.go
@@ -186,11 +186,11 @@ func (i *Indexer) OnCacheEvent(evt *KvEvent) {
 }
 
 func (i *Indexer) buildIndex() {
-       i.goroutine.Do(func(stopCh <-chan struct{}) {
+       i.goroutine.Do(func(ctx context.Context) {
                util.SafeCloseChan(i.ready)
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case evt, ok := <-i.prefixBuildQueue:
                                if !ok {
@@ -317,7 +317,7 @@ func NewCacheIndexer(t StoreType, cr Cacher) *Indexer {
                cacheType:        t,
                prefixIndex:      make(map[string]map[string]struct{}, 
DEFAULT_MAX_EVENT_COUNT),
                prefixBuildQueue: make(chan *KvEvent, DEFAULT_MAX_EVENT_COUNT),
-               goroutine:        util.NewGo(make(chan struct{})),
+               goroutine:        util.NewGo(context.Background()),
                ready:            make(chan struct{}),
                isClose:          true,
        }
diff --git a/server/core/backend/store/listwatch.go 
b/server/core/backend/store/listwatch.go
index 189f9094..4ffb9d45 100644
--- a/server/core/backend/store/listwatch.go
+++ b/server/core/backend/store/listwatch.go
@@ -131,17 +131,17 @@ func (w *Watcher) EventBus() <-chan []*Event {
        return w.bus
 }
 
-func (w *Watcher) process() {
+func (w *Watcher) process(_ context.Context) {
        stopCh := make(chan struct{})
        ctx, cancel := context.WithTimeout(w.ListOps.Context, w.ListOps.Timeout)
-       go func() {
+       util.Go(func(_ context.Context) {
                defer close(stopCh)
                w.lw.doWatch(ctx, w.sendEvent)
-       }()
+       })
 
        select {
        case <-stopCh:
-               // time out
+               // timed out or exception
                w.Stop()
        case <-w.stopCh:
                cancel()
@@ -180,6 +180,6 @@ func newWatcher(lw *ListWatcher, listOps *ListOptions) 
*Watcher {
                bus:     make(chan []*Event, EVENT_BUS_MAX_SIZE),
                stopCh:  make(chan struct{}),
        }
-       go w.process()
+       util.Go(w.process)
        return w
 }
diff --git a/server/core/backend/store/store.go 
b/server/core/backend/store/store.go
index 6a5f38c3..c462d411 100644
--- a/server/core/backend/store/store.go
+++ b/server/core/backend/store/store.go
@@ -109,6 +109,7 @@ type KvStore struct {
        asyncTaskSvc *async.AsyncTaskService
        lock         sync.RWMutex
        ready        chan struct{}
+       goroutine    *util.GoRoutine
        isClose      bool
 }
 
@@ -116,6 +117,7 @@ func (s *KvStore) Initialize() {
        s.indexers = make(map[StoreType]*Indexer)
        s.asyncTaskSvc = async.NewAsyncTaskService()
        s.ready = make(chan struct{})
+       s.goroutine = util.NewGo(context.Background())
 
        for i := StoreType(0); i != typeEnd; i++ {
                store.newNullStore(i)
@@ -147,7 +149,7 @@ func (s *KvStore) newIndexer(t StoreType, cacher Cacher) {
 }
 
 func (s *KvStore) Run() {
-       go s.store()
+       s.goroutine.Do(s.store)
        s.asyncTaskSvc.Run()
 }
 
@@ -166,7 +168,7 @@ func (s *KvStore) SelfPreservationHandler() DeferHandler {
        return &InstanceEventDeferHandler{Percent: 
DEFAULT_SELF_PRESERVATION_PERCENT}
 }
 
-func (s *KvStore) store() {
+func (s *KvStore) store(ctx context.Context) {
        for t := StoreType(0); t != typeEnd; t++ {
                switch t {
                case INSTANCE:
@@ -178,7 +180,11 @@ func (s *KvStore) store() {
                }
        }
        for _, i := range s.indexers {
-               <-i.Ready()
+               select {
+               case <-ctx.Done():
+                       return
+               case <-i.Ready():
+               }
        }
        util.SafeCloseChan(s.ready)
 
@@ -214,9 +220,11 @@ func (s *KvStore) Stop() {
 
        s.asyncTaskSvc.Stop()
 
+       s.goroutine.Close(true)
+
        util.SafeCloseChan(s.ready)
 
-       util.Logger().Debugf("store daemon stopped.")
+       util.Logger().Debugf("store daemon stopped")
 }
 
 func (s *KvStore) Ready() <-chan struct{} {
diff --git a/server/infra/registry/registry.go 
b/server/infra/registry/registry.go
index 575a351f..0856fa3b 100644
--- a/server/infra/registry/registry.go
+++ b/server/infra/registry/registry.go
@@ -144,7 +144,7 @@ const (
 )
 
 const (
-       REQUEST_TIMEOUT = 300
+       REQUEST_TIMEOUT = 30 * time.Second
 
        DEFAULT_PAGE_COUNT = 4096 // grpc does not allow to transport a large 
body more then 4MB in a request.
 )
@@ -359,7 +359,7 @@ func OpCmp(opt CompareOperation, result CompareResult, v 
interface{}) (cmp Compa
 }
 
 func WithTimeout(ctx context.Context) (context.Context, context.CancelFunc) {
-       return context.WithTimeout(ctx, REQUEST_TIMEOUT*time.Second)
+       return context.WithTimeout(ctx, REQUEST_TIMEOUT)
 }
 
 func RegistryConfig() *Config {
diff --git a/server/plugin/infra/registry/embededetcd/embededetcd.go 
b/server/plugin/infra/registry/embededetcd/embededetcd.go
index 4b941bce..3bb71401 100644
--- a/server/plugin/infra/registry/embededetcd/embededetcd.go
+++ b/server/plugin/infra/registry/embededetcd/embededetcd.go
@@ -41,16 +41,17 @@ import (
 
 var embedTLSConfig *tls.Config
 
-const START_MANAGER_SERVER_TIMEOUT = 60
+const START_MANAGER_SERVER_TIMEOUT = 10
 
 func init() {
        mgr.RegisterPlugin(mgr.Plugin{mgr.REGISTRY, "embeded_etcd", 
getEmbedInstance})
 }
 
 type EtcdEmbed struct {
-       Server *embed.Etcd
-       err    chan error
-       ready  chan int
+       Embed     *embed.Etcd
+       err       chan error
+       ready     chan int
+       goroutine *util.GoRoutine
 }
 
 func (s *EtcdEmbed) Err() <-chan error {
@@ -62,9 +63,10 @@ func (s *EtcdEmbed) Ready() <-chan int {
 }
 
 func (s *EtcdEmbed) Close() {
-       if s.Server != nil {
-               s.Server.Close()
+       if s.Embed != nil {
+               s.Embed.Close()
        }
+       s.goroutine.Close(true)
        util.Logger().Debugf("embedded etcd client stopped.")
 }
 
@@ -232,7 +234,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve 
int64) error {
        }
 
        util.Logger().Infof("Compacting... revision is %d(current: %d, reserve 
%d)", revToCompact, curRev, reserve)
-       _, err := s.Server.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
+       _, err := s.Embed.Server.Compact(ctx, &etcdserverpb.CompactionRequest{
                Revision: revToCompact,
                Physical: true,
        })
@@ -250,7 +252,7 @@ func (s *EtcdEmbed) Compact(ctx context.Context, reserve 
int64) error {
 }
 
 func (s *EtcdEmbed) getLeaderCurrentRevision(ctx context.Context) int64 {
-       return s.Server.Server.KV().Rev()
+       return s.Embed.Server.KV().Rev()
 }
 
 func (s *EtcdEmbed) PutNoOverride(ctx context.Context, opts 
...registry.PluginOpOption) (bool, error) {
@@ -275,7 +277,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts 
...registry.PluginOpOption) (*r
        switch op.Action {
        case registry.Get:
                var etcdResp *etcdserverpb.RangeResponse
-               etcdResp, err = s.Server.Server.Range(otCtx, s.toGetRequest(op))
+               etcdResp, err = s.Embed.Server.Range(otCtx, s.toGetRequest(op))
                if err != nil {
                        break
                }
@@ -286,7 +288,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts 
...registry.PluginOpOption) (*r
                }
        case registry.Put:
                var etcdResp *etcdserverpb.PutResponse
-               etcdResp, err = s.Server.Server.Put(otCtx, s.toPutRequest(op))
+               etcdResp, err = s.Embed.Server.Put(otCtx, s.toPutRequest(op))
                if err != nil {
                        break
                }
@@ -295,7 +297,7 @@ func (s *EtcdEmbed) Do(ctx context.Context, opts 
...registry.PluginOpOption) (*r
                }
        case registry.Delete:
                var etcdResp *etcdserverpb.DeleteRangeResponse
-               etcdResp, err = s.Server.Server.DeleteRange(otCtx, 
s.toDeleteRequest(op))
+               etcdResp, err = s.Embed.Server.DeleteRange(otCtx, 
s.toDeleteRequest(op))
                if err != nil {
                        break
                }
@@ -338,7 +340,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success 
[]registry.PluginOp,
        if len(etcdFailOps) > 0 {
                txnRequest.Failure = etcdFailOps
        }
-       resp, err := s.Server.Server.Txn(otCtx, txnRequest)
+       resp, err := s.Embed.Server.Txn(otCtx, txnRequest)
        if err != nil {
                return nil, err
        }
@@ -351,7 +353,7 @@ func (s *EtcdEmbed) TxnWithCmp(ctx context.Context, success 
[]registry.PluginOp,
 func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL int64) (int64, error) {
        otCtx, cancel := registry.WithTimeout(ctx)
        defer cancel()
-       etcdResp, err := s.Server.Server.LeaseGrant(otCtx, 
&etcdserverpb.LeaseGrantRequest{
+       etcdResp, err := s.Embed.Server.LeaseGrant(otCtx, 
&etcdserverpb.LeaseGrantRequest{
                TTL: TTL,
        })
        if err != nil {
@@ -363,7 +365,7 @@ func (s *EtcdEmbed) LeaseGrant(ctx context.Context, TTL 
int64) (int64, error) {
 func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID int64) (int64, 
error) {
        otCtx, cancel := registry.WithTimeout(ctx)
        defer cancel()
-       ttl, err := s.Server.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
+       ttl, err := s.Embed.Server.LeaseRenew(otCtx, lease.LeaseID(leaseID))
        if err != nil {
                if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) 
{
                        return 0, err
@@ -376,7 +378,7 @@ func (s *EtcdEmbed) LeaseRenew(ctx context.Context, leaseID 
int64) (int64, error
 func (s *EtcdEmbed) LeaseRevoke(ctx context.Context, leaseID int64) error {
        otCtx, cancel := registry.WithTimeout(ctx)
        defer cancel()
-       _, err := s.Server.Server.LeaseRevoke(otCtx, 
&etcdserverpb.LeaseRevokeRequest{
+       _, err := s.Embed.Server.LeaseRevoke(otCtx, 
&etcdserverpb.LeaseRevokeRequest{
                ID: leaseID,
        })
        if err != nil {
@@ -392,7 +394,7 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts 
...registry.PluginOpOption)
        op := registry.OpGet(opts...)
 
        if len(op.Key) > 0 {
-               watchable := s.Server.Server.Watchable()
+               watchable := s.Embed.Server.Watchable()
                ws := watchable.NewWatchStream()
                defer ws.Close()
 
@@ -455,6 +457,29 @@ func (s *EtcdEmbed) Watch(ctx context.Context, opts 
...registry.PluginOpOption)
        return
 }
 
+func (s *EtcdEmbed) ReadyNotify() {
+       timeout := START_MANAGER_SERVER_TIMEOUT * time.Second
+       select {
+       case <-s.Embed.Server.ReadyNotify():
+               close(s.ready)
+               s.goroutine.Do(func(ctx context.Context) {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case err := <-s.Embed.Err():
+                               s.err <- err
+                       }
+               })
+       case <-time.After(timeout):
+               err := fmt.Errorf("timed out(%s)", timeout)
+               util.Logger().Errorf(err, "read notify failed")
+
+               s.Embed.Server.Stop()
+
+               s.err <- err
+       }
+}
+
 func setKvsAndConvertAction(kvs []*mvccpb.KeyValue, pIdx int, evt 
*mvccpb.Event) registry.ActionType {
        switch evt.Type {
        case mvccpb.DELETE:
@@ -488,8 +513,9 @@ func getEmbedInstance() mgr.PluginInstance {
        addrs := beego.AppConfig.DefaultString("manager_addr", 
"http://127.0.0.1:2380";)
 
        inst := &EtcdEmbed{
-               err:   make(chan error, 1),
-               ready: make(chan int),
+               err:       make(chan error, 1),
+               ready:     make(chan int),
+               goroutine: util.NewGo(context.Background()),
        }
 
        if core.ServerInfo.Config.SslEnabled {
@@ -537,30 +563,14 @@ func getEmbedInstance() mgr.PluginInstance {
                inst.err <- err
                return inst
        }
-       inst.Server = etcd
-
-       select {
-       case <-etcd.Server.ReadyNotify():
-               close(inst.ready)
-               go func() {
-                       select {
-                       case err := <-etcd.Err():
-                               inst.err <- err
-                       }
-               }()
-       case <-time.After(START_MANAGER_SERVER_TIMEOUT * time.Second):
-               message := "etcd server took too long to start"
-               util.Logger().Error(message, nil)
+       inst.Embed = etcd
 
-               etcd.Server.Stop()
-
-               inst.err <- errors.New(message)
-       }
+       inst.ReadyNotify()
        return inst
 }
 
 func parseURL(addrs string) ([]url.URL, error) {
-       urls := []url.URL{}
+       var urls []url.URL
        ips := strings.Split(addrs, ",")
        for _, ip := range ips {
                addr, err := url.Parse(ip)
diff --git a/server/plugin/infra/tracing/buildin/file_collector.go 
b/server/plugin/infra/tracing/buildin/file_collector.go
index bd48e5bd..851b3fc7 100644
--- a/server/plugin/infra/tracing/buildin/file_collector.go
+++ b/server/plugin/infra/tracing/buildin/file_collector.go
@@ -23,15 +23,19 @@ import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "github.com/apache/incubator-servicecomb-service-center/server/core"
        "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+       "golang.org/x/net/context"
        "os"
+       "strings"
        "time"
 )
 
 type FileCollector struct {
        Fd        *os.File
+       Timeout   time.Duration
        Interval  time.Duration
        BatchSize int
        c         chan *zipkincore.Span
+       goroutine *util.GoRoutine
 }
 
 func (f *FileCollector) Collect(span *zipkincore.Span) error {
@@ -39,11 +43,16 @@ func (f *FileCollector) Collect(span *zipkincore.Span) 
error {
                return fmt.Errorf("required FD to write")
        }
 
-       f.c <- span
+       select {
+       case f.c <- span:
+       case <-time.After(f.Timeout):
+               util.Logger().Errorf(nil, "send span to handle channel timed 
out(%s)", f.Timeout)
+       }
        return nil
 }
 
 func (f *FileCollector) Close() error {
+       f.goroutine.Close(true)
        return f.Fd.Close()
 }
 
@@ -77,7 +86,7 @@ func (f *FileCollector) write(batch []*zipkincore.Span) (c 
int) {
 }
 
 func (f *FileCollector) checkFile() error {
-       if util.PathExist(f.Fd.Name()) {
+       if util.PathExist(f.Fd.Name()) || strings.Index(f.Fd.Name(), "/dev/") 
== 0 {
                return nil
        }
 
@@ -100,52 +109,54 @@ func (f *FileCollector) checkFile() error {
        return nil
 }
 
-func (f *FileCollector) Run(stopCh <-chan struct{}) {
-       var (
-               batch []*zipkincore.Span
-               prev  []*zipkincore.Span
-               i     = f.Interval * 10
-               t     = time.NewTicker(f.Interval)
-               nr    = time.Now().Add(i)
-               max   = f.BatchSize * 2
-       )
-       for {
-               select {
-               case <-stopCh:
-                       f.write(batch)
-                       return
-               case span := <-f.c:
-                       batch = append(batch, span)
-                       if len(batch) >= f.BatchSize {
-                               if len(batch) > max {
-                                       dispose := len(batch) - f.BatchSize
-                                       util.Logger().Errorf(nil, "backlog is 
full, dispose %d span(s), max: %d",
-                                               dispose, max)
-                                       batch = batch[dispose:] // allocate more
-                               }
-                               if c := f.write(batch); c == 0 {
-                                       continue
+func (f *FileCollector) Run() {
+       f.goroutine.Do(func(ctx context.Context) {
+               var (
+                       batch []*zipkincore.Span
+                       prev  []*zipkincore.Span
+                       i     = f.Interval * 10
+                       t     = time.NewTicker(f.Interval)
+                       nr    = time.Now().Add(i)
+                       max   = f.BatchSize * 2
+               )
+               for {
+                       select {
+                       case <-ctx.Done():
+                               f.write(batch)
+                               return
+                       case span := <-f.c:
+                               batch = append(batch, span)
+                               if len(batch) >= f.BatchSize {
+                                       if len(batch) > max {
+                                               dispose := len(batch) - 
f.BatchSize
+                                               util.Logger().Errorf(nil, 
"backlog is full, dispose %d span(s), max: %d",
+                                                       dispose, max)
+                                               batch = batch[dispose:] // 
allocate more
+                                       }
+                                       if c := f.write(batch); c == 0 {
+                                               continue
+                                       }
+                                       if prev != nil {
+                                               batch, prev = prev[:0], batch
+                                       } else {
+                                               prev, batch = batch, 
batch[len(batch):] // new one
+                                       }
                                }
-                               if prev != nil {
-                                       batch, prev = prev[:0], batch
-                               } else {
-                                       prev, batch = batch, batch[len(batch):] 
// new one
+                       case <-t.C:
+                               if time.Now().After(nr) {
+                                       util.LogRotateFile(f.Fd.Name(),
+                                               
int(core.ServerInfo.Config.LogRotateSize),
+                                               
int(core.ServerInfo.Config.LogBackupCount),
+                                       )
+                                       nr = time.Now().Add(i)
                                }
-                       }
-               case <-t.C:
-                       if time.Now().After(nr) {
-                               util.LogRotateFile(f.Fd.Name(),
-                                       
int(core.ServerInfo.Config.LogRotateSize),
-                                       
int(core.ServerInfo.Config.LogBackupCount),
-                               )
-                               nr = time.Now().Add(i)
-                       }
 
-                       if c := f.write(batch); c > 0 {
-                               batch = batch[:0]
+                               if c := f.write(batch); c > 0 {
+                                       batch = batch[:0]
+                               }
                        }
                }
-       }
+       })
 }
 
 func NewFileCollector(path string) (*FileCollector, error) {
@@ -155,10 +166,12 @@ func NewFileCollector(path string) (*FileCollector, 
error) {
        }
        fc := &FileCollector{
                Fd:        fd,
+               Timeout:   5 * time.Second,
                Interval:  10 * time.Second,
                BatchSize: 100,
                c:         make(chan *zipkincore.Span, 1000),
+               goroutine: util.NewGo(context.Background()),
        }
-       util.Go(fc.Run)
+       fc.Run()
        return fc, nil
 }
diff --git a/server/plugin/infra/tracing/buildin/file_collector_test.go 
b/server/plugin/infra/tracing/buildin/file_collector_test.go
index 075fdaec..4f3e1505 100644
--- a/server/plugin/infra/tracing/buildin/file_collector_test.go
+++ b/server/plugin/infra/tracing/buildin/file_collector_test.go
@@ -17,34 +17,33 @@
 package buildin
 
 import (
+       "fmt"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+       "golang.org/x/net/context"
        "os"
        "testing"
        "time"
 )
 
 func TestFileCollector_Collect(t *testing.T) {
-       fileName := "./test"
-       fd, err := os.OpenFile(fileName, os.O_APPEND|os.O_CREATE|os.O_RDWR, 
0600)
-       if err != nil {
-               t.FailNow()
-       }
        fc := &FileCollector{
-               Fd:        fd,
+               Fd:        os.Stdout,
+               Timeout:   1 * time.Second,
                Interval:  100 * time.Second,
                BatchSize: 2,
-               c:         make(chan *zipkincore.Span, 1000),
+               c:         make(chan *zipkincore.Span, 100),
+               goroutine: util.NewGo(context.Background()),
        }
        defer func() {
                fc.Close()
-               os.Remove(fileName)
        }()
-       util.Go(fc.Run)
+       fc.Run()
 
-       for i := int64(0); i < 3; i++ {
-               err := fc.Collect(&zipkincore.Span{ParentID: &i, TraceIDHigh: 
&i})
+       for i := 0; i < 10; i++ {
+               err := fc.Collect(&zipkincore.Span{})
                if err != nil {
+                       fmt.Println(err)
                        t.FailNow()
                }
        }
diff --git a/server/plugin/infra/tracing/buildin/span.go 
b/server/plugin/infra/tracing/buildin/span.go
index 5b8011d9..3d200887 100644
--- a/server/plugin/infra/tracing/buildin/span.go
+++ b/server/plugin/infra/tracing/buildin/span.go
@@ -61,7 +61,9 @@ type Endpoint struct {
 func (s *Span) FromZipkinSpan(span *zipkincore.Span) {
        traceId := new(types.TraceID)
        traceId.Low = uint64(span.TraceID)
-       traceId.High = uint64(*(span.TraceIDHigh))
+       if span.TraceIDHigh != nil {
+               traceId.High = uint64(*(span.TraceIDHigh))
+       }
        s.TraceID = traceId.ToHex()
        s.Duration = span.Duration
 
diff --git a/server/plugin/infra/tracing/buildin/span_test.go 
b/server/plugin/infra/tracing/buildin/span_test.go
index c3dceb49..069e6a9a 100644
--- a/server/plugin/infra/tracing/buildin/span_test.go
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -158,4 +158,12 @@ func TestFromZipkinSpan(t *testing.T) {
                t.FailNow()
        }
        fmt.Println(string(b))
+
+       s = FromZipkinSpan(&zipkincore.Span{})
+       b, err = json.Marshal(s)
+       if err != nil {
+               fmt.Println("TestFromZipkinSpan Marshal", err)
+               t.FailNow()
+       }
+       fmt.Println(string(b))
 }
diff --git a/server/server.go b/server/server.go
index a5468798..15e0abd3 100644
--- a/server/server.go
+++ b/server/server.go
@@ -43,6 +43,7 @@ func init() {
                store:         st.Store(),
                notifyService: nf.GetNotifyService(),
                apiServer:     GetAPIServer(),
+               goroutine:     util.NewGo(context.Background()),
        }
 }
 
@@ -50,6 +51,7 @@ type ServiceCenterServer struct {
        apiServer     *APIServer
        notifyService *nf.NotifyService
        store         *st.KvStore
+       goroutine     *util.GoRoutine
 }
 
 func (s *ServiceCenterServer) Run() {
@@ -74,7 +76,7 @@ func (s *ServiceCenterServer) waitForQuit() {
 
        s.Stop()
 
-       util.Logger().Warn("service center quit", nil)
+       util.Logger().Debugf("service center stopped")
 }
 
 func (s *ServiceCenterServer) needUpgrade() bool {
@@ -119,12 +121,12 @@ func (s *ServiceCenterServer) autoCompactBackend() {
                util.Logger().Errorf(err, "invalid compact interval %s, reset 
to default interval 12h", core.ServerInfo.Config.CompactInterval)
                interval = 12 * time.Hour
        }
-       util.Go(func(stopCh <-chan struct{}) {
+       s.goroutine.Do(func(ctx context.Context) {
                util.Logger().Infof("enabled the automatic compact mechanism, 
compact once every %s, reserve %d",
                        core.ServerInfo.Config.CompactInterval, delta)
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case <-time.After(interval):
                                lock, err := mux.Try(mux.GLOBAL_LOCK)
@@ -133,7 +135,7 @@ func (s *ServiceCenterServer) autoCompactBackend() {
                                        continue
                                }
 
-                               
backend.Registry().Compact(context.Background(), delta)
+                               backend.Registry().Compact(ctx, delta)
 
                                lock.Unlock()
                        }
@@ -190,9 +192,7 @@ func (s *ServiceCenterServer) Stop() {
                s.store.Stop()
        }
 
-       util.GoCloseAndWait()
-
-       backend.Registry().Close()
+       s.goroutine.Close(true)
 }
 
 func Run() {
diff --git a/server/service/event/dependency_event_handler.go 
b/server/service/event/dependency_event_handler.go
index 7bbe5df8..6961c816 100644
--- a/server/service/event/dependency_event_handler.go
+++ b/server/service/event/dependency_event_handler.go
@@ -50,7 +50,7 @@ func (h *DependencyEventHandler) OnEvent(evt *store.KvEvent) {
 }
 
 func (h *DependencyEventHandler) loop() {
-       util.Go(func(stopCh <-chan struct{}) {
+       util.Go(func(ctx context.Context) {
                waitDelayIndex := 0
                waitDelay := []int{1, 1, 5, 10, 20, 30, 60}
                retry := func() {
@@ -64,7 +64,7 @@ func (h *DependencyEventHandler) loop() {
                }
                for {
                        select {
-                       case <-stopCh:
+                       case <-ctx.Done():
                                return
                        case <-h.signals.Chan():
                                lock, err := mux.Try(mux.DEP_QUEUE_LOCK)
diff --git a/server/service/instances.go b/server/service/instances.go
index 48c70296..eb945069 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -370,19 +370,7 @@ func (s *InstanceService) HeartbeatSet(ctx 
context.Context, in *pb.HeartbeatSetR
                        
existFlag[heartbeatElement.ServiceId+heartbeatElement.InstanceId] = true
                        noMultiCounter++
                }
-               go func(element *pb.HeartbeatSetElement) {
-                       hbRst := &pb.InstanceHbRst{
-                               ServiceId:  element.ServiceId,
-                               InstanceId: element.InstanceId,
-                               ErrMessage: "",
-                       }
-                       _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, 
domainProject, element.ServiceId, element.InstanceId)
-                       if err != nil {
-                               hbRst.ErrMessage = err.Error()
-                               util.Logger().Errorf(err, "heartbeat set 
failed, %s/%s", element.ServiceId, element.InstanceId)
-                       }
-                       instancesHbRst <- hbRst
-               }(heartbeatElement)
+               util.Go(getHeartbeatFunc(ctx, domainProject, instancesHbRst, 
heartbeatElement))
        }
        count := 0
        successFlag := false
@@ -415,6 +403,22 @@ func (s *InstanceService) HeartbeatSet(ctx 
context.Context, in *pb.HeartbeatSetR
        }
 }
 
+func getHeartbeatFunc(ctx context.Context, domainProject string, 
instancesHbRst chan<- *pb.InstanceHbRst, element *pb.HeartbeatSetElement) 
func(context.Context) {
+       return func(_ context.Context) {
+               hbRst := &pb.InstanceHbRst{
+                       ServiceId:  element.ServiceId,
+                       InstanceId: element.InstanceId,
+                       ErrMessage: "",
+               }
+               _, _, err, _ := serviceUtil.HeartbeatUtil(ctx, domainProject, 
element.ServiceId, element.InstanceId)
+               if err != nil {
+                       hbRst.ErrMessage = err.Error()
+                       util.Logger().Errorf(err, "heartbeat set failed, 
%s/%s", element.ServiceId, element.InstanceId)
+               }
+               instancesHbRst <- hbRst
+       }
+}
+
 func (s *InstanceService) GetOneInstance(ctx context.Context, in 
*pb.GetOneInstanceRequest) (*pb.GetOneInstanceResponse, error) {
        checkErr := s.getInstancePreCheck(ctx, in)
        if checkErr != nil {
@@ -723,7 +727,6 @@ func (s *InstanceService) UpdateInstanceProperties(ctx 
context.Context, in *pb.U
        }, nil
 }
 
-
 func (s *InstanceService) WatchPreOpera(ctx context.Context, in 
*pb.WatchInstanceRequest) error {
        if in == nil || len(in.SelfServiceId) == 0 {
                return errors.New("Request format invalid.")
@@ -742,7 +745,7 @@ func (s *InstanceService) Watch(in 
*pb.WatchInstanceRequest, stream pb.ServiceIn
                return err
        }
        domainProject := util.ParseDomainProject(stream.Context())
-       watcher := nf.NewInstanceWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/")
+       watcher := nf.NewInstanceListWatcher(in.SelfServiceId, 
apt.GetInstanceRootKey(domainProject)+"/", nil)
        err = nf.GetNotifyService().AddSubscriber(watcher)
        util.Logger().Infof("start watch instance status, watcher %s %s", 
watcher.Subject(), watcher.Id())
        return nf.HandleWatchJob(watcher, stream, 
nf.GetNotifyService().Config.NotifyTimeout)
@@ -754,7 +757,7 @@ func (s *InstanceService) WebSocketWatch(ctx 
context.Context, in *pb.WatchInstan
                nf.EstablishWebSocketError(conn, err)
                return
        }
-       nf.DoWebSocketWatch(ctx, in.SelfServiceId, conn)
+       nf.DoWebSocketListAndWatch(ctx, in.SelfServiceId, nil, conn)
 }
 
 func (s *InstanceService) WebSocketListAndWatch(ctx context.Context, in 
*pb.WatchInstanceRequest, conn *websocket.Conn) {
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 4fe47d29..6663ab99 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -359,11 +359,6 @@ func (s *MicroServiceService) DeleteServices(ctx 
context.Context, request *pb.De
                        nuoMultilCount++
                }
 
-               serviceRst := &pb.DelServicesRspInfo{
-                       ServiceId:  serviceId,
-                       ErrMessage: "",
-               }
-
                //检查服务ID合法性
                in := &pb.DeleteServiceRequest{
                        ServiceId: serviceId,
@@ -372,22 +367,15 @@ func (s *MicroServiceService) DeleteServices(ctx 
context.Context, request *pb.De
                err := apt.Validate(in)
                if err != nil {
                        util.Logger().Errorf(err, "delete micro-service failed, 
serviceId is %s: invalid parameters.", in.ServiceId)
-                       serviceRst.ErrMessage = err.Error()
-                       serviceRespChan <- serviceRst
+                       serviceRespChan <- &pb.DelServicesRspInfo{
+                               ServiceId:  serviceId,
+                               ErrMessage: err.Error(),
+                       }
                        continue
                }
 
                //执行删除服务操作
-               go func(serviceItem string) {
-                       resp, err := s.DeleteServicePri(ctx, serviceItem, 
request.Force)
-                       if err != nil {
-                               serviceRst.ErrMessage = err.Error()
-                       } else if resp.Code != pb.Response_SUCCESS {
-                               serviceRst.ErrMessage = resp.Message
-                       }
-
-                       serviceRespChan <- serviceRst
-               }(serviceId)
+               util.Go(s.getDeleteServiceFunc(ctx, serviceId, request.Force, 
serviceRespChan))
        }
 
        //获取批量删除服务的结果
@@ -419,6 +407,23 @@ func (s *MicroServiceService) DeleteServices(ctx 
context.Context, request *pb.De
        return resp, nil
 }
 
+func (s *MicroServiceService) getDeleteServiceFunc(ctx context.Context, 
serviceId string, force bool, serviceRespChan chan<- *pb.DelServicesRspInfo) 
func(context.Context) {
+       return func(_ context.Context) {
+               serviceRst := &pb.DelServicesRspInfo{
+                       ServiceId:  serviceId,
+                       ErrMessage: "",
+               }
+               resp, err := s.DeleteServicePri(ctx, serviceId, force)
+               if err != nil {
+                       serviceRst.ErrMessage = err.Error()
+               } else if resp.Code != pb.Response_SUCCESS {
+                       serviceRst.ErrMessage = resp.Message
+               }
+
+               serviceRespChan <- serviceRst
+       }
+}
+
 func (s *MicroServiceService) GetOne(ctx context.Context, in 
*pb.GetServiceRequest) (*pb.GetServiceResponse, error) {
        if in == nil || len(in.ServiceId) == 0 {
                return &pb.GetServiceResponse{
@@ -437,7 +442,7 @@ func (s *MicroServiceService) GetOne(ctx context.Context, 
in *pb.GetServiceReque
        service, err := serviceUtil.GetService(ctx, domainProject, in.ServiceId)
 
        if err != nil {
-               util.Logger().Errorf(err, "get micro-service failed, serviceId 
is %s: inner err,get service failed.", in.ServiceId)
+               util.Logger().Errorf(err, "get micro-service failed, serviceId 
is %s: inner err, get service failed.", in.ServiceId)
                return &pb.GetServiceResponse{
                        Response: pb.CreateResponse(scerr.ErrInternal, "Get 
service file failed."),
                }, err
@@ -655,7 +660,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx 
context.Context, in *pb.Create
        //create rules
        if in.Rules != nil && len(in.Rules) != 0 {
                chanLen++
-               go func() {
+               util.Go(func(_ context.Context) {
                        req := &pb.AddServiceRulesRequest{
                                ServiceId: serviceId,
                                Rules:     in.Rules,
@@ -670,12 +675,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx 
context.Context, in *pb.Create
                                chanRsp.Message = rsp.Response.Message
                        }
                        createRespChan <- chanRsp
-               }()
+               })
        }
        //create tags
        if in.Tags != nil && len(in.Tags) != 0 {
                chanLen++
-               go func() {
+               util.Go(func(_ context.Context) {
                        req := &pb.AddServiceTagsRequest{
                                ServiceId: serviceId,
                                Tags:      in.Tags,
@@ -690,12 +695,12 @@ func (s *MicroServiceService) CreateServiceEx(ctx 
context.Context, in *pb.Create
                                chanRsp.Message = rsp.Response.Message
                        }
                        createRespChan <- chanRsp
-               }()
+               })
        }
        // create instance
        if in.Instances != nil && len(in.Instances) != 0 {
                chanLen++
-               go func() {
+               util.Go(func(_ context.Context) {
                        chanRsp := &pb.Response{}
                        for _, ins := range in.Instances {
                                req := &pb.RegisterInstanceRequest{
@@ -711,7 +716,7 @@ func (s *MicroServiceService) CreateServiceEx(ctx 
context.Context, in *pb.Create
                                }
                                createRespChan <- chanRsp
                        }
-               }()
+               })
        }
 
        // handle result
diff --git a/server/service/notification/listwatcher.go 
b/server/service/notification/listwatcher.go
index 8a340760..de0e2c66 100644
--- a/server/service/notification/listwatcher.go
+++ b/server/service/notification/listwatcher.go
@@ -19,6 +19,7 @@ package notification
 import (
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
        pb 
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
+       "golang.org/x/net/context"
        "time"
 )
 
@@ -44,10 +45,10 @@ func (w *ListWatcher) OnAccept() {
        }
 
        util.Logger().Debugf("accepted by notify service, %s watcher %s %s", 
w.Type(), w.Id(), w.Subject())
-       go w.listAndPublishJobs()
+       util.Go(w.listAndPublishJobs)
 }
 
-func (w *ListWatcher) listAndPublishJobs() {
+func (w *ListWatcher) listAndPublishJobs(_ context.Context) {
        defer close(w.listCh)
        if w.ListFunc == nil {
                return
@@ -112,10 +113,6 @@ func NewWatchJob(nType NotifyType, subscriberId, subject 
string, rev int64, resp
        }
 }
 
-func NewWatcher(nType NotifyType, id string, subject string) *ListWatcher {
-       return NewListWatcher(nType, id, subject, nil)
-}
-
 func NewListWatcher(nType NotifyType, id string, subject string,
        listFunc func() (results []*pb.WatchInstanceResponse, rev int64)) 
*ListWatcher {
        watcher := &ListWatcher{
diff --git a/server/service/notification/notification_service.go 
b/server/service/notification/notification_service.go
index e2d78640..63f78a27 100644
--- a/server/service/notification/notification_service.go
+++ b/server/service/notification/notification_service.go
@@ -20,6 +20,7 @@ import (
        "container/list"
        "errors"
        "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+       "golang.org/x/net/context"
        "sync"
        "time"
 )
@@ -33,7 +34,8 @@ var notifyService *NotifyService
 
 func init() {
        notifyService = &NotifyService{
-               isClose: true,
+               isClose:   true,
+               goroutine: util.NewGo(context.Background()),
        }
 }
 
@@ -46,13 +48,14 @@ type serviceIndex map[NotifyType]subscriberSubjectIndex
 type NotifyService struct {
        Config NotifyServiceConfig
 
-       services serviceIndex
-       queues   map[NotifyType]chan NotifyJob
-       waits    sync.WaitGroup
-       mutexes  map[NotifyType]*sync.Mutex
-       err      chan error
-       closeMux sync.RWMutex
-       isClose  bool
+       services  serviceIndex
+       queues    map[NotifyType]chan NotifyJob
+       waits     sync.WaitGroup
+       mutexes   map[NotifyType]*sync.Mutex
+       err       chan error
+       closeMux  sync.RWMutex
+       isClose   bool
+       goroutine *util.GoRoutine
 }
 
 func (s *NotifyService) Err() <-chan error {
@@ -150,41 +153,52 @@ func (s *NotifyService) AddJob(job NotifyJob) error {
        }
 }
 
-func (s *NotifyService) publish2Subscriber(t NotifyType) {
-       defer s.waits.Done()
-       for job := range s.queues[t] {
-               util.Logger().Infof("notification service got a job %s: %s to 
notify subscriber %s",
-                       job.Type(), job.Subject(), job.SubscriberId())
+func (s *NotifyService) getPublish2SubscriberFunc(t NotifyType) 
func(context.Context) {
+       return func(ctx context.Context) {
+               defer s.waits.Done()
+               for {
+                       select {
+                       case <-ctx.Done():
+                               return
+                       case job, ok := <-s.queues[t]:
+                               if !ok {
+                                       return
+                               }
 
-               s.mutexes[t].Lock()
+                               util.Logger().Infof("notification service got a 
job %s: %s to notify subscriber %s",
+                                       job.Type(), job.Subject(), 
job.SubscriberId())
 
-               if s.Closed() && len(s.services[t]) == 0 {
-                       s.mutexes[t].Unlock()
-                       return
-               }
+                               s.mutexes[t].Lock()
 
-               m, ok := s.services[t][job.Subject()]
-               if ok {
-                       // publish的subject如果带上id,则单播,否则广播
-                       if len(job.SubscriberId()) != 0 {
-                               ns, ok := m[job.SubscriberId()]
+                               if s.Closed() && len(s.services[t]) == 0 {
+                                       s.mutexes[t].Unlock()
+                                       return
+                               }
+
+                               m, ok := s.services[t][job.Subject()]
                                if ok {
-                                       for n := ns.Front(); n != nil; n = 
n.Next() {
-                                               
n.Value.(Subscriber).OnMessage(job)
+                                       // publish的subject如果带上id,则单播,否则广播
+                                       if len(job.SubscriberId()) != 0 {
+                                               ns, ok := m[job.SubscriberId()]
+                                               if ok {
+                                                       for n := ns.Front(); n 
!= nil; n = n.Next() {
+                                                               
n.Value.(Subscriber).OnMessage(job)
+                                                       }
+                                               }
+                                               s.mutexes[t].Unlock()
+                                               continue
+                                       }
+                                       for key := range m {
+                                               ns := m[key]
+                                               for n := ns.Front(); n != nil; 
n = n.Next() {
+                                                       
n.Value.(Subscriber).OnMessage(job)
+                                               }
                                        }
                                }
+
                                s.mutexes[t].Unlock()
-                               continue
-                       }
-                       for key := range m {
-                               ns := m[key]
-                               for n := ns.Front(); n != nil; n = n.Next() {
-                                       n.Value.(Subscriber).OnMessage(job)
-                               }
                        }
                }
-
-               s.mutexes[t].Unlock()
        }
 }
 
@@ -227,7 +241,7 @@ func (s *NotifyService) Start() {
        util.Logger().Debugf("notify service is started with config %s", 
s.Config)
 
        for i := NotifyType(0); i != typeEnd; i++ {
-               go s.publish2Subscriber(i)
+               s.goroutine.Do(s.getPublish2SubscriberFunc(i))
        }
 }
 
@@ -255,6 +269,8 @@ func (s *NotifyService) Stop() {
 
        close(s.err)
 
+       s.goroutine.Close(true)
+
        util.Logger().Debug("notify service stopped.")
 }
 
diff --git a/server/service/notification/watch_util.go 
b/server/service/notification/watch_util.go
index c938719c..9a7e26c5 100644
--- a/server/service/notification/watch_util.go
+++ b/server/service/notification/watch_util.go
@@ -62,6 +62,7 @@ type WebSocketHandler struct {
        watcher         *ListWatcher
        needPingWatcher bool
        closed          chan struct{}
+       goroutine       *util.GoRoutine
 }
 
 func (wh *WebSocketHandler) Init() error {
@@ -101,7 +102,7 @@ func (wh *WebSocketHandler) websocketHeartbeat(messageType 
int) error {
        return nil
 }
 
-func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage() {
+func (wh *WebSocketHandler) HandleWatchWebSocketControlMessage(ctx 
context.Context) {
        defer close(wh.closed)
 
        remoteAddr := wh.conn.RemoteAddr().String()
@@ -128,17 +129,23 @@ func (wh *WebSocketHandler) 
HandleWatchWebSocketControlMessage() {
        })
 
        for {
-               _, _, err := wh.conn.ReadMessage()
-               if err != nil {
-                       wh.watcher.SetError(err)
+               select {
+               case <-ctx.Done():
                        return
+               default:
+                       _, _, err := wh.conn.ReadMessage()
+                       if err != nil {
+                               wh.watcher.SetError(err)
+                               return
+                       }
                }
        }
 }
 
 func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
-       remoteAddr := wh.conn.RemoteAddr().String()
+       wh.goroutine.Do(wh.HandleWatchWebSocketControlMessage)
 
+       remoteAddr := wh.conn.RemoteAddr().String()
        for {
                select {
                case <-wh.closed:
@@ -224,8 +231,10 @@ func (wh *WebSocketHandler) HandleWatchWebSocketJob() {
 }
 
 func (wh *WebSocketHandler) Close(code int, text string) error {
+       defer wh.goroutine.Close(true)
+
        remoteAddr := wh.conn.RemoteAddr().String()
-       message := []byte{}
+       var message []byte
        if code != websocket.CloseNoStatusReceived {
                message = websocket.FormatCloseMessage(code, text)
        }
@@ -238,18 +247,6 @@ func (wh *WebSocketHandler) Close(code int, text string) 
error {
        return nil
 }
 
-func DoWebSocketWatch(ctx context.Context, serviceId string, conn 
*websocket.Conn) {
-       domainProject := util.ParseDomainProject(ctx)
-       handler := &WebSocketHandler{
-               ctx:             ctx,
-               conn:            conn,
-               watcher:         NewInstanceWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/"),
-               needPingWatcher: true,
-               closed:          make(chan struct{}),
-       }
-       processHandler(handler)
-}
-
 func DoWebSocketListAndWatch(ctx context.Context, serviceId string, f func() 
([]*pb.WatchInstanceResponse, int64), conn *websocket.Conn) {
        domainProject := util.ParseDomainProject(ctx)
        handler := &WebSocketHandler{
@@ -258,6 +255,7 @@ func DoWebSocketListAndWatch(ctx context.Context, serviceId 
string, f func() ([]
                watcher:         NewInstanceListWatcher(serviceId, 
apt.GetInstanceRootKey(domainProject)+"/", f),
                needPingWatcher: true,
                closed:          make(chan struct{}),
+               goroutine:       util.NewGo(context.Background()),
        }
        processHandler(handler)
 }
@@ -266,7 +264,6 @@ func processHandler(handler *WebSocketHandler) {
        if err := handler.Init(); err != nil {
                return
        }
-       go handler.HandleWatchWebSocketControlMessage()
        handler.HandleWatchWebSocketJob()
 }
 
@@ -294,10 +291,6 @@ func PublishInstanceEvent(domainProject string, action 
pb.EventType, serviceKey
        }
 }
 
-func NewInstanceWatcher(selfServiceId, instanceRoot string) *ListWatcher {
-       return NewWatcher(INSTANCE, selfServiceId, instanceRoot)
-}
-
 func NewInstanceListWatcher(selfServiceId, instanceRoot string, listFunc 
func() (results []*pb.WatchInstanceResponse, rev int64)) *ListWatcher {
        return NewListWatcher(INSTANCE, selfServiceId, instanceRoot, listFunc)
 }
diff --git a/server/service/util/dependency.go 
b/server/service/util/dependency.go
index 66261163..f0837ee4 100644
--- a/server/service/util/dependency.go
+++ b/server/service/util/dependency.go
@@ -557,11 +557,10 @@ type Dependency struct {
 
 func (dep *Dependency) RemoveConsumerOfProviderRule() {
        dep.chanNum++
-       go dep.removeConsumerOfProviderRule()
+       util.Go(dep.removeConsumerOfProviderRule)
 }
 
-func (dep *Dependency) removeConsumerOfProviderRule() {
-       ctx := context.TODO()
+func (dep *Dependency) removeConsumerOfProviderRule(ctx context.Context) {
        opts := make([]registry.PluginOp, 0, len(dep.removedDependencyRuleList))
        for _, providerRule := range dep.removedDependencyRuleList {
                proProkey := 
apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)
@@ -605,11 +604,10 @@ func (dep *Dependency) removeConsumerOfProviderRule() {
 
 func (dep *Dependency) AddConsumerOfProviderRule() {
        dep.chanNum++
-       go dep.addConsumerOfProviderRule()
+       util.Go(dep.addConsumerOfProviderRule)
 }
 
-func (dep *Dependency) addConsumerOfProviderRule() {
-       ctx := context.TODO()
+func (dep *Dependency) addConsumerOfProviderRule(ctx context.Context) {
        opts := []registry.PluginOp{}
        for _, providerRule := range dep.NewDependencyRuleList {
                proProkey := 
apt.GenerateProviderDependencyRuleKey(providerRule.Tenant, providerRule)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to