[ https://issues.apache.org/jira/browse/SCB-924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16621364#comment-16621364 ]
ASF GitHub Bot commented on SCB-924: ------------------------------------ little-cui closed pull request #444: SCB-924 Etcd cacher should re-list etcd in fixed time interval URL: https://github.com/apache/incubator-servicecomb-service-center/pull/444 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/scripts/build/local.sh b/scripts/build/local.sh index dec2147d..3e38b81f 100644 --- a/scripts/build/local.sh +++ b/scripts/build/local.sh @@ -19,6 +19,8 @@ set -e export GOOS=${1:-"linux"} export GOARCH=${4:-"amd64"} +export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file +export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"} RELEASE=${2:-"0.0.1"} diff --git a/scripts/release/make_release.sh b/scripts/release/make_release.sh index fe9e3c3c..137b163c 100755 --- a/scripts/release/make_release.sh +++ b/scripts/release/make_release.sh @@ -57,6 +57,8 @@ esac ## Get the arch type export GOARCH=${4:-"amd64"} +export CGO_ENABLED=${CGO_ENABLED:-0} # prevent to compile cgo file +export GO_EXTLINK_ENABLED=${GO_EXTLINK_ENABLED:-0} # do not use host linker export GO_LDFLAGS=${GO_LDFLAGS:-"-linkmode 'external' -extldflags '-static' -s -w"} root_path=$(cd "$(dirname "$0")"; pwd) diff --git a/server/infra/discovery/common.go b/server/infra/discovery/common.go index 965b4c12..eccaaa5c 100644 --- a/server/infra/discovery/common.go +++ b/server/infra/discovery/common.go @@ -21,8 +21,6 @@ import ( ) const ( - // re-list when there is no event coming in more than 1h(=120*30s) - DEFAULT_MAX_NO_EVENT_INTERVAL = 120 - DEFAULT_TIMEOUT = 30 * time.Second - DEFAULT_CACHE_INIT_SIZE = 100 + DEFAULT_TIMEOUT = 30 * time.Second + DEFAULT_CACHE_INIT_SIZE = 100 ) diff --git a/server/infra/discovery/config.go b/server/infra/discovery/config.go index f9bdc2ac..076053d3 100644 --- a/server/infra/discovery/config.go +++ b/server/infra/discovery/config.go @@ -24,14 +24,13 @@ import ( type Config struct { // Key is the prefix to unique specify resource type - Key string - InitSize int - NoEventPeriods int - Timeout time.Duration - Period time.Duration - DeferHandler DeferHandler - OnEvent KvEventFunc - Parser pb.Parser + Key string + InitSize int + Timeout time.Duration + Period time.Duration + DeferHandler DeferHandler + OnEvent KvEventFunc + Parser pb.Parser } func (cfg *Config) String() string { @@ -69,11 +68,6 @@ func (cfg *Config) WithEventFunc(f KvEventFunc) *Config { return cfg } -func (cfg *Config) WithNoEventPeriods(p int) *Config { - cfg.NoEventPeriods = p - return cfg -} - func (cfg *Config) AppendEventFunc(f KvEventFunc) *Config { if prev := cfg.OnEvent; prev != nil { next := f @@ -93,11 +87,10 @@ func (cfg *Config) WithParser(parser pb.Parser) *Config { func Configure() *Config { return &Config{ - Key: "/", - Timeout: DEFAULT_TIMEOUT, - Period: time.Second, - NoEventPeriods: DEFAULT_MAX_NO_EVENT_INTERVAL, - InitSize: DEFAULT_CACHE_INIT_SIZE, - Parser: pb.BytesParser, + Key: "/", + Timeout: DEFAULT_TIMEOUT, + Period: time.Second, + InitSize: DEFAULT_CACHE_INIT_SIZE, + Parser: pb.BytesParser, } } diff --git a/server/infra/discovery/config_test.go b/server/infra/discovery/config_test.go index 202537d7..0adfa0b6 100644 --- a/server/infra/discovery/config_test.go +++ b/server/infra/discovery/config_test.go @@ -55,10 +55,6 @@ func TestConfigure(t *testing.T) { if cfg.Period != 3*time.Second { t.Fatalf("TestConfigure failed") } - cfg.WithNoEventPeriods(1) - if cfg.NoEventPeriods != 1 { - t.Fatalf("TestConfigure failed") - } cfg.WithDeferHandler(&mockDeferHandler{}) if cfg.DeferHandler == nil { t.Fatalf("TestConfigure failed") diff --git a/server/plugin/infra/discovery/etcd/cacher_kv.go b/server/plugin/infra/discovery/etcd/cacher_kv.go index 31c42751..0cade627 100644 --- a/server/plugin/infra/discovery/etcd/cacher_kv.go +++ b/server/plugin/infra/discovery/etcd/cacher_kv.go @@ -36,8 +36,8 @@ import ( type KvCacher struct { Cfg *discovery.Config - latestListRev int64 - noEventPeriods int + latestListRev int64 + reListCount int ready chan struct{} lw ListWatch @@ -53,22 +53,16 @@ func (c *KvCacher) Config() *discovery.Config { func (c *KvCacher) needList() bool { rev := c.lw.Revision() + // init stage or there is a backend error if rev == 0 { - c.noEventPeriods = 0 + c.reListCount = 0 return true } - if c.latestListRev != rev { - c.noEventPeriods = 0 + c.reListCount++ + if c.reListCount < DEFAULT_FORCE_LIST_INTERVAL { return false } - c.noEventPeriods++ - if c.Cfg.NoEventPeriods == 0 || c.noEventPeriods < c.Cfg.NoEventPeriods { - return false - } - - log.Debugf("no events come in more then %s, need to list key %s, rev: %d", - time.Duration(c.noEventPeriods)*c.Cfg.Timeout, c.Cfg.Key, rev) - c.noEventPeriods = 0 + c.reListCount = 0 return true } diff --git a/server/plugin/infra/discovery/etcd/cacher_kv_test.go b/server/plugin/infra/discovery/etcd/cacher_kv_test.go index 1ee4650d..5ac42f84 100644 --- a/server/plugin/infra/discovery/etcd/cacher_kv_test.go +++ b/server/plugin/infra/discovery/etcd/cacher_kv_test.go @@ -97,7 +97,6 @@ func TestNewKvCacher(t *testing.T) { var evt discovery.KvEvent cr = &KvCacher{ Cfg: discovery.Configure(). - WithNoEventPeriods(0). WithEventFunc(func(e discovery.KvEvent) { evt = e }), @@ -151,11 +150,13 @@ func TestNewKvCacher(t *testing.T) { } // case re-list and over no event times - lw.Bus <- nil + for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ { + lw.Bus <- nil + } evt.KV = nil - old := *cr.Cfg - cr.Cfg.WithNoEventPeriods(1) - cr.refresh(ctx) + for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ { + cr.refresh(ctx) + } // check event if evt.Type != pb.EVT_UPDATE || evt.Revision != 4 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" { t.Fatalf("TestNewKvCacher failed, %v", evt) @@ -167,10 +168,13 @@ func TestNewKvCacher(t *testing.T) { } lw.ListResponse = ®istry.PluginResponse{Revision: 5} - lw.Bus <- nil + for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ { + lw.Bus <- nil + } evt.KV = nil - cr.refresh(ctx) - *cr.Cfg = old + for i := 0; i < DEFAULT_FORCE_LIST_INTERVAL; i++ { + cr.refresh(ctx) + } // check event if evt.Type != pb.EVT_DELETE || evt.Revision != 5 || evt.KV.ModRevision != 3 || string(evt.KV.Key) != "ka" || string(evt.KV.Value.([]byte)) != "va" { t.Fatalf("TestNewKvCacher failed, %v", evt) @@ -281,7 +285,7 @@ func TestNewKvCacher(t *testing.T) { lw.ListResponse = test lw.Bus <- nil evt.KV = nil - old = *cr.Cfg + old := *cr.Cfg cr.Cfg.WithParser(pb.MapParser) cr.refresh(ctx) // check event diff --git a/server/plugin/infra/discovery/etcd/common.go b/server/plugin/infra/discovery/etcd/common.go index fe1663db..d7891674 100644 --- a/server/plugin/infra/discovery/etcd/common.go +++ b/server/plugin/infra/discovery/etcd/common.go @@ -23,9 +23,11 @@ import ( ) const ( - DEFAULT_METRICS_INTERVAL = 30 * time.Second - DEFAULT_COMPACT_TIMES = 2 - DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute + // force re-list + DEFAULT_FORCE_LIST_INTERVAL = 4 + DEFAULT_METRICS_INTERVAL = 30 * time.Second + DEFAULT_COMPACT_TIMES = 2 + DEFAULT_COMPACT_TIMEOUT = 5 * time.Minute minWaitInterval = 1 * time.Second eventBlockSize = 1000 diff --git a/server/service/notification/processor_test.go b/server/service/notification/processor_test.go index ed5f2702..7c22096c 100644 --- a/server/service/notification/processor_test.go +++ b/server/service/notification/processor_test.go @@ -19,11 +19,24 @@ package notification import ( "github.com/apache/incubator-servicecomb-service-center/pkg/gopool" "testing" + "time" ) +type mockSubscriberChan struct { + *BaseSubscriber + job chan NotifyJob +} + +func (s *mockSubscriberChan) OnMessage(job NotifyJob) { + s.job <- job +} + func TestProcessor_Do(t *testing.T) { - mock1 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1")} - mock2 := &mockSubscriber{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2")} + delay := 50 * time.Millisecond + mock1 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g1"), + job: make(chan NotifyJob, 1)} + mock2 := &mockSubscriberChan{BaseSubscriber: NewSubscriber(INSTANCE, "s1", "g2"), + job: make(chan NotifyJob, 1)} p := NewProcessor("p1", 0) gopool.Go(p.Do) if p.Name() != "p1" { @@ -48,27 +61,55 @@ func TestProcessor_Do(t *testing.T) { t.Fatalf("TestProcessor_Do") } p.AddSubscriber(mock1) + p.AddSubscriber(mock2) job := &BaseNotifyJob{group: "g1"} p.Accept(job) - if mock1.job != nil { + select { + case <-mock1.job: t.Fatalf("TestProcessor_Do") + case <-time.After(delay): } job.subject = "s1" job.group = "g3" p.Accept(job) - if mock1.job != nil { + select { + case <-mock1.job: t.Fatalf("TestProcessor_Do") + case <-time.After(delay): } job.subject = "s1" job.group = "g1" p.Accept(job) - if mock1.job != job || mock2.job != nil { + select { + case j := <-mock1.job: + if j != job { + t.Fatalf("TestProcessor_Do") + } + case <-time.After(delay): t.Fatalf("TestProcessor_Do") } + select { + case <-mock2.job: + t.Fatalf("TestProcessor_Do") + case <-time.After(delay): + } job.subject = "s1" job.group = "" p.Accept(job) - if mock1.job != job && mock2.job != job { + select { + case j := <-mock1.job: + if j != job { + t.Fatalf("TestProcessor_Do") + } + case <-time.After(delay): + t.Fatalf("TestProcessor_Do") + } + select { + case j := <-mock2.job: + if j != job { + t.Fatalf("TestProcessor_Do") + } + case <-time.After(delay): t.Fatalf("TestProcessor_Do") } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Etcd cacher should re-list etcd in fixed time interval > ------------------------------------------------------ > > Key: SCB-924 > URL: https://issues.apache.org/jira/browse/SCB-924 > Project: Apache ServiceComb > Issue Type: Bug > Components: Service-Center > Reporter: little-cui > Assignee: little-cui > Priority: Major > Fix For: service-center-1.1.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)