This is an automated email from the ASF dual-hosted git repository.
asifdxtreme pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new cac383e SCB-343 Batch delete micro-services and enable tracing will
cause SC crash (#282)
cac383e is described below
commit cac383e50f07f8ba4a526b2651f99d7eb47433b6
Author: little-cui <[email protected]>
AuthorDate: Tue Feb 13 12:36:35 2018 +0800
SCB-343 Batch delete micro-services and enable tracing will cause SC crash
(#282)
SCB-343 Batch delete micro-services and enable tracing will cause SC crash
---
pkg/chain/handler.go | 5 +-
pkg/etcdsync/mutex.go | 44 ++--
pkg/grace/grace.go | 45 +++--
pkg/httplimiter/httpratelimiter.go | 221 ---------------------
pkg/ratelimiter/ratelimiter.go | 118 -----------
pkg/util/concurrent_map.go | 120 +++++++++++
pkg/util/concurrent_map_test.go | 161 +++++++++++++++
pkg/util/context.go | 29 +--
pkg/util/goroutines_test.go | 18 +-
pkg/util/log_test.go | 13 +-
pkg/util/net.go | 31 ++-
pkg/util/net_test.go | 11 +
pkg/{validate => util}/reflect.go | 18 +-
.../util/reflect_test.go | 45 ++++-
pkg/util/sys.go | 9 +
pkg/util/tree_test.go | 4 +-
pkg/util/uniqueue_test.go | 6 -
pkg/util/util.go | 9 +
pkg/util/util_test.go | 7 +
pkg/validate/validate.go | 2 +-
server/bootstrap/bootstrap.go | 2 -
server/interceptor/ratelimiter/limiter.go | 89 ---------
server/interceptor/ratelimiter/limiter_test.go | 83 --------
server/server.go | 5 +-
server/service/instances.go | 3 +-
server/service/microservices.go | 12 +-
server/service/util/instance_util.go | 12 --
server/service/util/instance_util_test.go | 12 --
server/service/util/rule_util.go | 1 -
29 files changed, 466 insertions(+), 669 deletions(-)
diff --git a/pkg/chain/handler.go b/pkg/chain/handler.go
index 46d7491..be48299 100644
--- a/pkg/chain/handler.go
+++ b/pkg/chain/handler.go
@@ -18,13 +18,12 @@ package chain
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
- "github.com/apache/incubator-servicecomb-service-center/pkg/validate"
"reflect"
)
const CAP_SIZE = 10
-var handlersMap map[string][]Handler = make(map[string][]Handler, CAP_SIZE)
+var handlersMap = make(map[string][]Handler, CAP_SIZE)
type Handler interface {
Handle(i *Invocation)
@@ -38,7 +37,7 @@ func RegisterHandler(catalog string, h Handler) {
handlers = append(handlers, h)
handlersMap[catalog] = handlers
- t := validate.LoadStruct(reflect.ValueOf(h).Elem().Interface())
+ t := util.LoadStruct(reflect.ValueOf(h).Elem().Interface())
util.Logger().Infof("register handler[%s] %s/%s", catalog,
t.Type.PkgPath(), t.Type.Name())
}
diff --git a/pkg/etcdsync/mutex.go b/pkg/etcdsync/mutex.go
index 3cb4bcf..f54ef6f 100644
--- a/pkg/etcdsync/mutex.go
+++ b/pkg/etcdsync/mutex.go
@@ -23,24 +23,22 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
"github.com/coreos/etcd/client"
"golang.org/x/net/context"
- "io"
"os"
"sync"
"time"
)
const (
- defaultTTL = 60
- defaultTry = 3
- ROOT_PATH = "/cse/etcdsync"
+ DEFAULT_LOCK_TTL = 60
+ DEFAULT_RETRY_TIMES = 3
+ ROOT_PATH = "/cse/etcdsync"
)
type DLockFactory struct {
- key string
- ctx context.Context
- ttl int64
- mutex *sync.Mutex
- logger io.Writer
+ key string
+ ctx context.Context
+ ttl int64
+ mutex *sync.Mutex
}
type DLock struct {
@@ -49,31 +47,19 @@ type DLock struct {
}
var (
- globalMap map[string]*DLockFactory
+ globalMap = make(map[string]*DLockFactory)
globalMux sync.Mutex
IsDebug bool
- hostname string
- pid int
+ hostname string = util.HostName()
+ pid int = os.Getpid()
)
-func init() {
- globalMap = make(map[string]*DLockFactory)
- IsDebug = false
-
- var err error
- hostname, err = os.Hostname()
- if err != nil {
- hostname = "UNKNOWN"
- }
- pid = os.Getpid()
-}
-
func NewLockFactory(key string, ttl int64) *DLockFactory {
if len(key) == 0 {
return nil
}
if ttl < 1 {
- ttl = defaultTTL
+ ttl = DEFAULT_LOCK_TTL
}
return &DLockFactory{
@@ -92,7 +78,7 @@ func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err
error) {
builder: m,
id: fmt.Sprintf("%v-%v-%v", hostname, pid,
time.Now().Format("20060102-15:04:05.999999999")),
}
- for try := 1; try <= defaultTry; try++ {
+ for try := 1; try <= DEFAULT_RETRY_TIMES; try++ {
err = l.Lock(wait)
if err == nil {
return l, nil
@@ -103,7 +89,7 @@ func (m *DLockFactory) NewDLock(wait bool) (l *DLock, err
error) {
break
}
- if try <= defaultTry {
+ if try <= DEFAULT_RETRY_TIMES {
util.Logger().Warnf(err, "Try to lock key %s again,
id=%s", m.key, l.id)
} else {
util.Logger().Errorf(err, "Lock key %s failed, id=%s",
m.key, l.id)
@@ -147,7 +133,7 @@ func (m *DLock) Lock(wait bool) error {
util.Logger().Warnf(err, "Key %s is locked, waiting for other
node releases it, id=%s", m.builder.key, m.id)
- ctx, cancel := context.WithTimeout(m.builder.ctx,
defaultTTL*time.Second)
+ ctx, cancel := context.WithTimeout(m.builder.ctx,
DEFAULT_LOCK_TTL*time.Second)
go func() {
err := backend.Registry().Watch(ctx,
registry.WithStrKey(m.builder.key),
@@ -179,7 +165,7 @@ func (m *DLock) Unlock() (err error) {
registry.DEL,
registry.WithStrKey(m.builder.key)}
- for i := 1; i <= defaultTry; i++ {
+ for i := 1; i <= DEFAULT_RETRY_TIMES; i++ {
_, err = backend.Registry().Do(m.builder.ctx, opts...)
if err == nil {
if !IsDebug {
diff --git a/pkg/grace/grace.go b/pkg/grace/grace.go
index dc212f7..fef6caf 100644
--- a/pkg/grace/grace.go
+++ b/pkg/grace/grace.go
@@ -146,26 +146,14 @@ func fork() (err error) {
for name, i := range filesOffsetMap {
orderArgs[i] = name
}
- path := os.Args[0]
- var args []string
- if len(os.Args) > 1 {
- for _, arg := range os.Args[1:] {
- if arg == "-fork" {
- break
- }
- args = append(args, arg)
- }
- }
- args = append(args, "-fork")
+
+ // add fork and file descriptions order flags
+ args := append(parseCommandLine(), "-fork")
if len(filesOffsetMap) > 0 {
args = append(args, fmt.Sprintf(`-filesorder=%s`,
strings.Join(orderArgs, ",")))
}
- cmd := exec.Command(path, args...)
- cmd.Stdout = os.Stdout
- cmd.Stderr = os.Stderr
- cmd.ExtraFiles = files
- err = cmd.Start()
- if err != nil {
+
+ if err = newCommand(args...); err != nil {
util.Logger().Errorf(err, "fork a process failed, %v", args)
return
}
@@ -173,6 +161,29 @@ func fork() (err error) {
return
}
+func parseCommandLine() (args []string) {
+ if len(os.Args) <= 1 {
+ return
+ }
+ // ignore process path
+ for _, arg := range os.Args[1:] {
+ if arg == "-fork" {
+ // ignore fork flags
+ break
+ }
+ args = append(args, arg)
+ }
+ return
+}
+
+func newCommand(args ...string) error {
+ cmd := exec.Command(os.Args[0], args...)
+ cmd.Stdout = os.Stdout
+ cmd.Stderr = os.Stderr
+ cmd.ExtraFiles = files
+ return cmd.Start()
+}
+
func IsFork() bool {
return isFork
}
diff --git a/pkg/httplimiter/httpratelimiter.go
b/pkg/httplimiter/httpratelimiter.go
deleted file mode 100644
index 3110056..0000000
--- a/pkg/httplimiter/httpratelimiter.go
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package httplimiter
-
-import (
- "fmt"
- "github.com/apache/incubator-servicecomb-service-center/pkg/ratelimiter"
- "net/http"
- "strconv"
- "strings"
- "sync"
- "time"
-)
-
-type HTTPErrorMessage struct {
- Message string
- StatusCode int
-}
-
-func (httpErrorMessage *HTTPErrorMessage) Error() string {
- return fmt.Sprintf("%v: %v", httpErrorMessage.StatusCode,
httpErrorMessage.Message)
-}
-
-type HttpLimiter struct {
- HttpMessage string
- ContentType string
- StatusCode int
- RequestLimit int64
- TTL time.Duration
- IPLookups []string
- Methods []string
- Headers map[string][]string
- BasicAuthUsers []string
- leakyBuckets map[string]*ratelimiter.LeakyBucket
- sync.RWMutex
-}
-
-func LimitBySegments(limiter *HttpLimiter, keys []string) *HTTPErrorMessage {
- if limiter.LimitExceeded(strings.Join(keys, "|")) {
- return &HTTPErrorMessage{Message: limiter.HttpMessage,
StatusCode: limiter.StatusCode}
- }
-
- return nil
-}
-
-func LimitByRequest(httpLimiter *HttpLimiter, r *http.Request)
*HTTPErrorMessage {
- sliceKeys := BuildSegments(httpLimiter, r)
-
- for _, keys := range sliceKeys {
- httpError := LimitBySegments(httpLimiter, keys)
- if httpError != nil {
- return httpError
- }
- }
-
- return nil
-}
-
-func BuildSegments(httpLimiter *HttpLimiter, r *http.Request) [][]string {
- remoteIP := getRemoteIP(httpLimiter.IPLookups, r)
- urlPath := r.URL.Path
- sliceKeys := make([][]string, 0)
-
- if remoteIP == "" {
- return sliceKeys
- }
-
- if httpLimiter.Methods != nil && httpLimiter.Headers != nil &&
httpLimiter.BasicAuthUsers != nil {
- if checkExistence(httpLimiter.Methods, r.Method) {
- for headerKey, headerValues := range
httpLimiter.Headers {
- if (headerValues == nil || len(headerValues) <=
0) && r.Header.Get(headerKey) != "" {
- username, _, ok := r.BasicAuth()
- if ok &&
checkExistence(httpLimiter.BasicAuthUsers, username) {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, r.Method, headerKey, username})
- }
-
- } else if len(headerValues) > 0 &&
r.Header.Get(headerKey) != "" {
- for _, headerValue := range
headerValues {
- username, _, ok := r.BasicAuth()
- if ok &&
checkExistence(httpLimiter.BasicAuthUsers, username) {
- sliceKeys =
append(sliceKeys, []string{remoteIP, urlPath, r.Method, headerKey, headerValue,
username})
- }
- }
- }
- }
- }
-
- } else if httpLimiter.Methods != nil && httpLimiter.Headers != nil {
- if checkExistence(httpLimiter.Methods, r.Method) {
- for headerKey, headerValues := range
httpLimiter.Headers {
- if (headerValues == nil || len(headerValues) <=
0) && r.Header.Get(headerKey) != "" {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, r.Method, headerKey})
-
- } else if len(headerValues) > 0 &&
r.Header.Get(headerKey) != "" {
- for _, headerValue := range
headerValues {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, r.Method, headerKey, headerValue})
- }
- }
- }
- }
-
- } else if httpLimiter.Methods != nil && httpLimiter.BasicAuthUsers !=
nil {
- if checkExistence(httpLimiter.Methods, r.Method) {
- username, _, ok := r.BasicAuth()
- if ok && checkExistence(httpLimiter.BasicAuthUsers,
username) {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, r.Method, username})
- }
- }
-
- } else if httpLimiter.Methods != nil {
- if checkExistence(httpLimiter.Methods, r.Method) {
- sliceKeys = append(sliceKeys, []string{remoteIP,
urlPath, r.Method})
- }
-
- } else if httpLimiter.Headers != nil {
- for headerKey, headerValues := range httpLimiter.Headers {
- if (headerValues == nil || len(headerValues) <= 0) &&
r.Header.Get(headerKey) != "" {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, headerKey})
-
- } else if len(headerValues) > 0 &&
r.Header.Get(headerKey) != "" {
- for _, headerValue := range headerValues {
- sliceKeys = append(sliceKeys,
[]string{remoteIP, urlPath, headerKey, headerValue})
- }
- }
- }
-
- } else if httpLimiter.BasicAuthUsers != nil {
- username, _, ok := r.BasicAuth()
- if ok && checkExistence(httpLimiter.BasicAuthUsers, username) {
- sliceKeys = append(sliceKeys, []string{remoteIP,
urlPath, username})
- }
- } else {
- sliceKeys = append(sliceKeys, []string{remoteIP, urlPath})
- }
-
- return sliceKeys
-}
-
-func SetResponseHeaders(limiter *HttpLimiter, w http.ResponseWriter) {
- w.Header().Add("X-Rate-Limit-Limit",
strconv.FormatInt(limiter.RequestLimit, 10))
- w.Header().Add("X-Rate-Limit-Duration", limiter.TTL.String())
-}
-
-func checkExistence(sliceString []string, needle string) bool {
- for _, b := range sliceString {
- if b == needle {
- return true
- }
- }
- return false
-}
-
-func ipAddrFromRemoteAddr(s string) string {
- idx := strings.LastIndex(s, ":")
- if idx == -1 {
- return s
- }
- return s[:idx]
-}
-
-func getRemoteIP(ipLookups []string, r *http.Request) string {
- realIP := r.Header.Get("X-Real-IP")
- forwardedFor := r.Header.Get("X-Forwarded-For")
-
- for _, lookup := range ipLookups {
- if lookup == "RemoteAddr" {
- return ipAddrFromRemoteAddr(r.RemoteAddr)
- }
- if lookup == "X-Forwarded-For" && forwardedFor != "" {
- parts := strings.Split(forwardedFor, ",")
- for i, p := range parts {
- parts[i] = strings.TrimSpace(p)
- }
- return parts[0]
- }
- if lookup == "X-Real-IP" && realIP != "" {
- return realIP
- }
- }
-
- return ""
-}
-
-func NewHttpLimiter(max int64, ttl time.Duration) *HttpLimiter {
- limiter := &HttpLimiter{RequestLimit: max, TTL: ttl}
- limiter.ContentType = "text/plain; charset=utf-8"
- limiter.HttpMessage = "You have reached maximum request limit."
- limiter.StatusCode = http.StatusTooManyRequests
- limiter.leakyBuckets = make(map[string]*ratelimiter.LeakyBucket)
- limiter.IPLookups = []string{"RemoteAddr", "X-Forwarded-For",
"X-Real-IP"}
-
- return limiter
-}
-
-func (rateLimiter *HttpLimiter) LimitExceeded(key string) bool {
- rateLimiter.Lock()
- if _, found := rateLimiter.leakyBuckets[key]; !found {
- rateLimiter.leakyBuckets[key] =
ratelimiter.NewLeakyBucket(rateLimiter.TTL, rateLimiter.RequestLimit,
rateLimiter.RequestLimit)
- }
- _, isInLimits := rateLimiter.leakyBuckets[key].MaximumTakeDuration(1, 0)
- rateLimiter.Unlock()
- if isInLimits {
- return false
- }
- return true
-}
diff --git a/pkg/ratelimiter/ratelimiter.go b/pkg/ratelimiter/ratelimiter.go
deleted file mode 100644
index a46bc39..0000000
--- a/pkg/ratelimiter/ratelimiter.go
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package ratelimiter
-
-import (
- "sync"
- "time"
-)
-
-type LeakyBucket struct {
- startTime time.Time
- capacity int64
- quantum int64
- interval time.Duration
- mutex sync.Mutex
- available int64
- availableTicker int64
-}
-
-func NewLeakyBucket(fillInterval time.Duration, capacity, quantum int64)
*LeakyBucket {
- if fillInterval <= 0 {
- panic("leaky bucket fill interval is not > 0")
- }
- if capacity <= 0 {
- panic("leaky bucket capacity is not > 0")
- }
- if quantum <= 0 {
- panic("leaky bucket quantum is not > 0")
- }
- return &LeakyBucket{
- startTime: time.Now(),
- capacity: capacity,
- quantum: quantum,
- available: capacity,
- interval: fillInterval,
- }
-}
-
-func (leakyBucket *LeakyBucket) Wait(count int64) {
- if d := leakyBucket.Take(count); d > 0 {
- time.Sleep(d)
- }
-}
-
-func (leakyBucket *LeakyBucket) MaximumWaitDuration(count int64, maxWait
time.Duration) bool {
- d, ok := leakyBucket.MaximumTakeDuration(count, maxWait)
- if d > 0 {
- time.Sleep(d)
- }
- return ok
-}
-
-const sleepForever time.Duration = 0x7fffffffffffffff
-
-func (leakyBucket *LeakyBucket) Take(count int64) time.Duration {
- d, _ := leakyBucket.take(time.Now(), count, sleepForever)
- return d
-}
-
-func (leakyBucket *LeakyBucket) MaximumTakeDuration(count int64, maxWait
time.Duration) (time.Duration, bool) {
- return leakyBucket.take(time.Now(), count, maxWait)
-}
-
-func (leakyBucket *LeakyBucket) Rate() float64 {
- return 1e9 * float64(leakyBucket.quantum) /
float64(leakyBucket.interval)
-}
-
-func (leakyBucket *LeakyBucket) take(now time.Time, count int64, maxWait
time.Duration) (time.Duration, bool) {
- if count <= 0 {
- return 0, true
- }
- leakyBucket.mutex.Lock()
- defer leakyBucket.mutex.Unlock()
-
- currentTick := leakyBucket.adjust(now)
- avail := leakyBucket.available - count
- if avail >= 0 {
- leakyBucket.available = avail
- return 0, true
- }
- endTick := currentTick +
(-avail+leakyBucket.quantum-1)/leakyBucket.quantum
- endTime := leakyBucket.startTime.Add(time.Duration(endTick) *
leakyBucket.interval)
- waitTime := endTime.Sub(now)
- if waitTime > maxWait {
- return 0, false
- }
- leakyBucket.available = avail
- return waitTime, true
-}
-
-func (leakyBucket *LeakyBucket) adjust(now time.Time) (currentTick int64) {
- currentTick = int64(now.Sub(leakyBucket.startTime) /
leakyBucket.interval)
-
- if leakyBucket.available >= leakyBucket.capacity {
- return
- }
- leakyBucket.available += (currentTick - leakyBucket.availableTicker) *
leakyBucket.quantum
- if leakyBucket.available > leakyBucket.capacity {
- leakyBucket.available = leakyBucket.capacity
- }
- leakyBucket.availableTicker = currentTick
- return
-}
diff --git a/pkg/util/concurrent_map.go b/pkg/util/concurrent_map.go
new file mode 100644
index 0000000..b7ec039
--- /dev/null
+++ b/pkg/util/concurrent_map.go
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import "sync"
+
+type MapItem struct {
+ Key interface{}
+ Value interface{}
+}
+
+type ConcurrentMap struct {
+ items map[interface{}]interface{}
+ size int
+ mux sync.RWMutex
+ once sync.Once
+}
+
+func (cm *ConcurrentMap) resize() {
+ cm.items = make(map[interface{}]interface{}, cm.size)
+}
+
+func (cm *ConcurrentMap) init() {
+ cm.once.Do(cm.resize)
+}
+
+func (cm *ConcurrentMap) Put(key, val interface{}) (old interface{}) {
+ cm.init()
+ cm.mux.Lock()
+ old, cm.items[key] = cm.items[key], val
+ cm.mux.Unlock()
+ return
+}
+
+func (cm *ConcurrentMap) PutIfAbsent(key, val interface{}) (old interface{}) {
+ var b bool
+ cm.init()
+ cm.mux.Lock()
+ old, b = cm.items[key]
+ if !b {
+ cm.items[key] = val
+ }
+ cm.mux.Unlock()
+ return
+}
+
+func (cm *ConcurrentMap) Get(key interface{}) (val interface{}, b bool) {
+ cm.init()
+ cm.mux.RLock()
+ val, b = cm.items[key]
+ cm.mux.RUnlock()
+ return
+}
+
+func (cm *ConcurrentMap) Remove(key interface{}) (old interface{}) {
+ var b bool
+ cm.init()
+ cm.mux.Lock()
+ old, b = cm.items[key]
+ if b {
+ delete(cm.items, key)
+ }
+ cm.mux.Unlock()
+ return
+}
+
+func (cm *ConcurrentMap) Clear() {
+ cm.mux.Lock()
+ cm.resize()
+ cm.mux.Unlock()
+}
+
+func (cm *ConcurrentMap) Size() (s int) {
+ return len(cm.items)
+}
+
+func (cm *ConcurrentMap) ForEach(f func(item MapItem) (next bool)) {
+ cm.mux.RLock()
+ s := len(cm.items)
+ if s == 0 {
+ cm.mux.RUnlock()
+ return
+ }
+ // avoid dead lock in function 'f'
+ ch := make(chan MapItem, s)
+ for k, v := range cm.items {
+ ch <- MapItem{k, v}
+ }
+ cm.mux.RUnlock()
+
+ for {
+ select {
+ case item := <-ch:
+ if b := f(item); b {
+ continue
+ }
+ default:
+ }
+ break
+ }
+ close(ch)
+}
+
+func NewConcurrentMap(size int) ConcurrentMap {
+ return ConcurrentMap{size: size}
+}
diff --git a/pkg/util/concurrent_map_test.go b/pkg/util/concurrent_map_test.go
new file mode 100644
index 0000000..90fa711
--- /dev/null
+++ b/pkg/util/concurrent_map_test.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "testing"
+)
+
+func TestConcurrentMap(t *testing.T) {
+ cm := ConcurrentMap{}
+ s := cm.Size()
+ if s != 0 {
+ fail(t, "TestConcurrentMap Size failed.")
+ }
+ v, b := cm.Get("a")
+ if b || v != nil {
+ fail(t, "TestConcurrentMap Get a not exist item failed.")
+ }
+ v = cm.Put("a", "1")
+ if v != nil {
+ fail(t, "TestConcurrentMap Put a new item failed.")
+ }
+ v, b = cm.Get("a")
+ if !b || v.(string) != "1" {
+ fail(t, "TestConcurrentMap Get an exist item failed.")
+ }
+ v = cm.Put("a", "2")
+ if v.(string) != "1" {
+ fail(t, "TestConcurrentMap Put an item again failed.")
+ }
+ v = cm.PutIfAbsent("b", "1")
+ if v != nil {
+ fail(t, "TestConcurrentMap PutIfAbsent a not exist item
failed.")
+ }
+ v = cm.PutIfAbsent("a", "3")
+ if v.(string) != "2" {
+ fail(t, "TestConcurrentMap PutIfAbsent an item failed.")
+ }
+ v, b = cm.Get("a")
+ if !b || v.(string) != "2" {
+ fail(t, "TestConcurrentMap Get an item after PutIfAbsent
failed.")
+ }
+ v = cm.Remove("a")
+ if v.(string) != "2" {
+ fail(t, "TestConcurrentMap Remove an item failed.")
+ }
+ v, b = cm.Get("a")
+ if b || v != nil {
+ fail(t, "TestConcurrentMap Get an item after Remove failed.")
+ }
+ s = cm.Size()
+ if s != 1 { // only 'b' is left
+ fail(t, "TestConcurrentMap Size after Put failed.")
+ }
+ cm.Clear()
+ s = cm.Size()
+ if s != 0 {
+ fail(t, "TestConcurrentMap Size after Clear failed.")
+ }
+}
+
+func TestConcurrentMap_ForEach(t *testing.T) {
+ l := 0
+ cm := ConcurrentMap{}
+ cm.ForEach(func(item MapItem) bool {
+ l++
+ return true
+ })
+ if l != 0 {
+ fail(t, "TestConcurrentMap_ForEach failed.")
+ }
+ for i := 0; i < 1000; i++ {
+ cm.Put(i, i)
+ }
+ cm.ForEach(func(item MapItem) bool {
+ l++
+ cm.Remove(item.Key)
+ return true
+ })
+ if l != 1000 || cm.Size() != 0 {
+ fail(t, "TestConcurrentMap_ForEach does not empty failed.")
+ }
+}
+
+func TestNewConcurrentMap(t *testing.T) {
+ cm := NewConcurrentMap(100)
+ if cm.size != 100 {
+ fail(t, "TestNewConcurrentMap failed.")
+ }
+}
+
+func BenchmarkConcurrentMap_Get(b *testing.B) {
+ var v interface{}
+ cm := ConcurrentMap{}
+ cm.Put("a", "1")
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ v, _ = cm.Get("a")
+ }
+ })
+ b.ReportAllocs()
+ // 20000000 88.7 ns/op 0 B/op 0
allocs/op
+}
+
+func BenchmarkConcurrentMap_Put(b *testing.B) {
+ cm := &ConcurrentMap{}
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cm.Put("a", "1")
+ }
+ })
+ b.ReportAllocs()
+ // 3000000 420 ns/op 32 B/op 2
allocs/op
+}
+
+func BenchmarkConcurrentMap_PutAndGet(b *testing.B) {
+ var v interface{}
+ cm := &ConcurrentMap{}
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cm.Put("a", "1")
+ v, _ = cm.Get("a")
+ }
+ })
+ b.ReportAllocs()
+ // 3000000 560 ns/op 32 B/op 2
allocs/op
+}
+
+func BenchmarkConcurrentMap_ForEach(b *testing.B) {
+ cm := ConcurrentMap{}
+ for i := 0; i < 100; i++ {
+ cm.Put(i, i)
+ }
+ b.ResetTimer()
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ cm.ForEach(func(item MapItem) bool {
+ return true
+ })
+ }
+ })
+ b.ReportAllocs()
+ // 500000 3148 ns/op 3296 B/op 2
allocs/op
+}
diff --git a/pkg/util/context.go b/pkg/util/context.go
index 41b41e4..4bc9b3a 100644
--- a/pkg/util/context.go
+++ b/pkg/util/context.go
@@ -24,7 +24,7 @@ import (
type StringContext struct {
parentCtx context.Context
- kv map[string]interface{}
+ kv ConcurrentMap
}
func (c *StringContext) Deadline() (deadline time.Time, ok bool) {
@@ -44,11 +44,12 @@ func (c *StringContext) Value(key interface{}) interface{} {
if !ok {
return c.parentCtx.Value(key)
}
- return c.kv[k]
+ v, _ := c.kv.Get(k)
+ return v
}
func (c *StringContext) SetKV(key string, val interface{}) {
- c.kv[key] = val
+ c.kv.Put(key, val)
}
func NewStringContext(ctx context.Context) *StringContext {
@@ -56,7 +57,7 @@ func NewStringContext(ctx context.Context) *StringContext {
if !ok {
strCtx = &StringContext{
parentCtx: ctx,
- kv: make(map[string]interface{}, 10),
+ kv: NewConcurrentMap(10),
}
}
return strCtx
@@ -69,19 +70,23 @@ func SetContext(ctx context.Context, key string, val
interface{}) context.Contex
}
func CloneContext(ctx context.Context) context.Context {
- strCtx := &StringContext{
- parentCtx: ctx,
- kv: make(map[string]interface{}, 10),
- }
-
old, ok := ctx.(*StringContext)
if !ok {
- return strCtx
+ return &StringContext{
+ parentCtx: ctx,
+ kv: NewConcurrentMap(10),
+ }
}
- for k, v := range old.kv {
- strCtx.kv[k] = v
+ strCtx := &StringContext{
+ parentCtx: ctx,
+ kv: NewConcurrentMap(old.kv.Size()),
}
+
+ old.kv.ForEach(func(item MapItem) bool {
+ strCtx.kv.Put(item.Key, item.Value)
+ return true
+ })
return strCtx
}
diff --git a/pkg/util/goroutines_test.go b/pkg/util/goroutines_test.go
index 592965c..cfc0919 100644
--- a/pkg/util/goroutines_test.go
+++ b/pkg/util/goroutines_test.go
@@ -33,15 +33,13 @@ func TestGoRoutine_Init(t *testing.T) {
test.Init(stopCh1)
c := test.StopCh()
if c != stopCh1 {
- fmt.Println("init GoRoutine failed.")
- t.Fail()
+ fail(t, "init GoRoutine failed.")
}
test.Init(stopCh2)
c = test.StopCh()
if c == stopCh2 {
- fmt.Println("init GoRoutine twice.")
- t.Fail()
+ fail(t, "init GoRoutine twice.")
}
}
@@ -53,8 +51,7 @@ func TestGoRoutine_Do(t *testing.T) {
defer close(stopCh)
select {
case <-neverStopCh:
- fmt.Println("neverStopCh should not be closed.")
- t.Fail()
+ fail(t, "neverStopCh should not be closed.")
case <-time.After(time.Second):
}
})
@@ -69,8 +66,7 @@ func TestGoRoutine_Do(t *testing.T) {
select {
case <-stopCh:
case <-time.After(time.Second):
- fmt.Println("time out to wait stopCh1 close.")
- t.Fail()
+ fail(t, "time out to wait stopCh1 close.")
}
})
close(stopCh1)
@@ -102,8 +98,7 @@ func TestGoRoutine_Wait(t *testing.T) {
test.Wait()
fmt.Println(resultArr)
if len(resultArr) != MAX {
- fmt.Println("fail to wait all goroutines finish.")
- t.Fail()
+ fail(t, "fail to wait all goroutines finish.")
}
}
@@ -114,8 +109,7 @@ func TestGoRoutine_Close(t *testing.T) {
select {
case <-stopCh:
case <-time.After(time.Second):
- fmt.Println("time out to wait stopCh close.")
- t.Fail()
+ fail(t, "time out to wait stopCh close.")
}
})
test.Close(true)
diff --git a/pkg/util/log_test.go b/pkg/util/log_test.go
index 0fe8e7c..bd030d0 100644
--- a/pkg/util/log_test.go
+++ b/pkg/util/log_test.go
@@ -17,7 +17,6 @@
package util
import (
- "fmt"
"testing"
)
@@ -25,25 +24,21 @@ func TestLogger(t *testing.T) {
CustomLogger("Not Exist", "testDefaultLOGGER")
l := Logger()
if l != LOGGER {
- fmt.Println("should equal to LOGGER")
- t.FailNow()
+ fail(t, "should equal to LOGGER")
}
CustomLogger("TestLogger", "testFuncName")
l = Logger()
if l == LOGGER || l == nil {
- fmt.Println("should create a new instance for 'TestLogger'")
- t.FailNow()
+ fail(t, "should create a new instance for 'TestLogger'")
}
s := Logger()
if l != s {
- fmt.Println("should be the same logger")
- t.FailNow()
+ fail(t, "should be the same logger")
}
CustomLogger("github.com/apache/incubator-servicecomb-service-center/pkg/util",
"testPkgPath")
l = Logger()
if l == LOGGER || l == nil {
- fmt.Println("should create a new instance for 'util'")
- t.FailNow()
+ fail(t, "should create a new instance for 'util'")
}
// l.Infof("OK")
}
diff --git a/pkg/util/net.go b/pkg/util/net.go
index 5791b4f..457f9b6 100644
--- a/pkg/util/net.go
+++ b/pkg/util/net.go
@@ -21,9 +21,15 @@ import (
"net"
"net/http"
"net/url"
+ "strconv"
"strings"
)
+type IpPort struct {
+ IP string
+ Port uint16
+}
+
func GetIPFromContext(ctx context.Context) string {
v, ok := FromContext(ctx, "x-remote-ip").(string)
if !ok {
@@ -56,6 +62,15 @@ func ParseEndpoint(ep string) (string, error) {
return u.Hostname(), nil
}
+func ParseIpPort(addr string) IpPort {
+ idx := strings.LastIndex(addr, ":")
+ if idx == -1 {
+ return IpPort{addr, 0}
+ }
+ p, _ := strconv.Atoi(addr[idx+1:])
+ return IpPort{addr[:idx], uint16(p)}
+}
+
func GetRealIP(r *http.Request) string {
for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} {
addresses := strings.Split(r.Header.Get(h), ",")
@@ -75,22 +90,6 @@ func GetRealIP(r *http.Request) string {
return ""
}
-func GetLocalIP() string {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return ""
- }
- for _, address := range addrs {
- // check the address type and if it is not a loopback the
display it
- if ipnet, ok := address.(*net.IPNet); ok &&
!ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
- }
- }
- }
- return ""
-}
-
func InetNtoIP(ipnr uint32) net.IP {
return net.IPv4(byte(ipnr>>24), byte(ipnr>>16), byte(ipnr>>8),
byte(ipnr))
}
diff --git a/pkg/util/net_test.go b/pkg/util/net_test.go
index b61b99d..c148406 100644
--- a/pkg/util/net_test.go
+++ b/pkg/util/net_test.go
@@ -56,3 +56,14 @@ func TestInetNtoa(t *testing.T) {
fail(t, "InetNtoa(%d) error", n3)
}
}
+
+func TestParseIpPort(t *testing.T) {
+ ipPort := ParseIpPort("0.0.0.0")
+ if ipPort.IP != "0.0.0.0" || ipPort.Port != 0 {
+ fail(t, "ParseIpPort(0.0.0.0) error", n3)
+ }
+ ipPort = ParseIpPort("0.0.0.0:1")
+ if ipPort.IP != "0.0.0.0" || ipPort.Port != 1 {
+ fail(t, "ParseIpPort(0.0.0.0) error", n3)
+ }
+}
diff --git a/pkg/validate/reflect.go b/pkg/util/reflect.go
similarity index 81%
rename from pkg/validate/reflect.go
rename to pkg/util/reflect.go
index 18f69b6..a5770f0 100644
--- a/pkg/validate/reflect.go
+++ b/pkg/util/reflect.go
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package validate
+package util
import (
"reflect"
@@ -26,21 +26,21 @@ var reflector *Reflector
func init() {
reflector = &Reflector{
- types: make(map[*uintptr]*StructType, 100),
+ types: make(map[*uintptr]StructType, 100),
}
}
type StructType struct {
Type reflect.Type
- Fields []*reflect.StructField
+ Fields []reflect.StructField
}
type Reflector struct {
- types map[*uintptr]*StructType
+ types map[*uintptr]StructType
mux sync.RWMutex
}
-func (r *Reflector) Load(obj interface{}) *StructType {
+func (r *Reflector) Load(obj interface{}) StructType {
r.mux.RLock()
itab := *(**uintptr)(unsafe.Pointer(&obj))
t, ok := r.types[itab]
@@ -55,16 +55,16 @@ func (r *Reflector) Load(obj interface{}) *StructType {
r.mux.Unlock()
return t
}
- t = &StructType{
+ t = StructType{
Type: reflect.TypeOf(obj),
}
fl := t.Type.NumField()
if fl > 0 {
- t.Fields = make([]*reflect.StructField, fl)
+ t.Fields = make([]reflect.StructField, fl)
for i := 0; i < fl; i++ {
f := t.Type.Field(i)
- t.Fields[i] = &f
+ t.Fields[i] = f
}
}
r.types[itab] = t
@@ -72,6 +72,6 @@ func (r *Reflector) Load(obj interface{}) *StructType {
return t
}
-func LoadStruct(obj interface{}) *StructType {
+func LoadStruct(obj interface{}) StructType {
return reflector.Load(obj)
}
diff --git a/server/interceptor/ratelimiter/ratelimiter_suite_test.go
b/pkg/util/reflect_test.go
similarity index 52%
rename from server/interceptor/ratelimiter/ratelimiter_suite_test.go
rename to pkg/util/reflect_test.go
index af2baf8..ae0dfa5 100644
--- a/server/interceptor/ratelimiter/ratelimiter_suite_test.go
+++ b/pkg/util/reflect_test.go
@@ -14,16 +14,47 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package ratelimiter_test
+package util
import (
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
-
+ "fmt"
"testing"
)
-func TestNet(t *testing.T) {
- RegisterFailHandler(Fail)
- RunSpecs(t, "HttpLimiter Suite")
+type testStru struct {
+ f1 int
+ f2 string
+ f3 testField
+ f4 *testField
+}
+
+type testField struct {
+}
+
+func TestLoadStruct(t *testing.T) {
+ obj1 := testStru{}
+ v := LoadStruct(obj1)
+ if v.Type.String() != "util.testStru" {
+ fail(t, "TestLoadStruct failed, %s != 'testStru'",
v.Type.String())
+ }
+ if len(v.Fields) != 4 {
+ fail(t, "TestLoadStruct failed, wrong count of fields")
+ }
+ for _, f := range v.Fields {
+ fmt.Println(f.Name, f.Type.String())
+ }
+
+ obj2 := testStru{}
+ v = LoadStruct(obj2)
+}
+
+func BenchmarkLoadStruct(b *testing.B) {
+ b.RunParallel(func(pb *testing.PB) {
+ for pb.Next() {
+ obj := testStru{}
+ LoadStruct(obj)
+ }
+ })
+ b.ReportAllocs()
+ // 20000000 86.9 ns/op 32 B/op 1
allocs/op
}
diff --git a/pkg/util/sys.go b/pkg/util/sys.go
index 8a9a23d..3873f87 100644
--- a/pkg/util/sys.go
+++ b/pkg/util/sys.go
@@ -37,3 +37,12 @@ func PathExist(path string) bool {
_, err := os.Stat(path)
return err == nil || os.IsExist(err)
}
+
+func HostName() (hostname string) {
+ var err error
+ hostname, err = os.Hostname()
+ if err != nil {
+ hostname = "UNKNOWN"
+ }
+ return
+}
diff --git a/pkg/util/tree_test.go b/pkg/util/tree_test.go
index 64e2339..79cb2c8 100644
--- a/pkg/util/tree_test.go
+++ b/pkg/util/tree_test.go
@@ -1,7 +1,6 @@
package util
import (
- "fmt"
"reflect"
"testing"
)
@@ -31,7 +30,6 @@ func TestTree(t *testing.T) {
testTree.InOrderTraversal(testTree.GetRoot(), handle)
if !reflect.DeepEqual(slice, targetSlice) {
- fmt.Printf(`TestTree failed`)
- t.FailNow()
+ fail(t, `TestTree failed`)
}
}
diff --git a/pkg/util/uniqueue_test.go b/pkg/util/uniqueue_test.go
index c69319d..6dca00d 100644
--- a/pkg/util/uniqueue_test.go
+++ b/pkg/util/uniqueue_test.go
@@ -25,12 +25,6 @@ import (
"time"
)
-func fail(t *testing.T, format string, args ...interface{}) {
- fmt.Printf(format, args...)
- fmt.Println()
- t.FailNow()
-}
-
func TestNewUniQueue(t *testing.T) {
_, err := newUniQueue(0)
if err == nil {
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 554af47..d63191a 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -201,3 +201,12 @@ func FormatFuncName(f string) string {
func FuncName(f interface{}) string {
return runtime.FuncForPC(reflect.ValueOf(f).Pointer()).Name()
}
+
+func SliceHave(arr []string, str string) bool {
+ for _, item := range arr {
+ if item == str {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/util/util_test.go b/pkg/util/util_test.go
index 16f342f..3a54c36 100644
--- a/pkg/util/util_test.go
+++ b/pkg/util/util_test.go
@@ -17,9 +17,16 @@
package util
import (
+ "fmt"
"testing"
)
+func fail(t *testing.T, format string, args ...interface{}) {
+ fmt.Printf(format, args...)
+ fmt.Println()
+ t.FailNow()
+}
+
func TestBytesToInt32(t *testing.T) {
bs := []byte{0, 0, 0, 1}
i := BytesToInt32(bs)
diff --git a/pkg/validate/validate.go b/pkg/validate/validate.go
index 65fcc20..3a36f31 100644
--- a/pkg/validate/validate.go
+++ b/pkg/validate/validate.go
@@ -208,7 +208,7 @@ func (v *Validator) Validate(s interface{}) error {
return errors.New("not support validate type")
}
- st := LoadStruct(s)
+ st := util.LoadStruct(s)
for i, l := 0, sv.NumField(); i < l; i++ {
field := sv.Field(i)
fieldName := st.Fields[i].Name
diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go
index b92c45a..058d034 100644
--- a/server/bootstrap/bootstrap.go
+++ b/server/bootstrap/bootstrap.go
@@ -55,7 +55,6 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/interceptor"
"github.com/apache/incubator-servicecomb-service-center/server/interceptor/access"
"github.com/apache/incubator-servicecomb-service-center/server/interceptor/cors"
-
"github.com/apache/incubator-servicecomb-service-center/server/interceptor/ratelimiter"
)
func init() {
@@ -63,7 +62,6 @@ func init() {
// intercept requests before routing.
interceptor.RegisterInterceptFunc(access.Intercept)
- interceptor.RegisterInterceptFunc(ratelimiter.Intercept)
interceptor.RegisterInterceptFunc(cors.Intercept)
// handle requests after routing.
diff --git a/server/interceptor/ratelimiter/limiter.go
b/server/interceptor/ratelimiter/limiter.go
deleted file mode 100644
index 5845b78..0000000
--- a/server/interceptor/ratelimiter/limiter.go
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ratelimiter
-
-import (
- "errors"
- "github.com/apache/incubator-servicecomb-service-center/pkg/httplimiter"
- "github.com/apache/incubator-servicecomb-service-center/pkg/util"
- "github.com/apache/incubator-servicecomb-service-center/server/core"
- "net/http"
- "strings"
- "sync"
- "time"
-)
-
-type Limiter struct {
- conns int64
-
- httpLimiter *httplimiter.HttpLimiter
-}
-
-var limiter *Limiter
-var mux sync.Mutex
-
-func GetLimiter() *Limiter {
- if limiter == nil {
- mux.Lock()
- if limiter == nil {
- limiter = new(Limiter)
- limiter.LoadConfig()
- }
- mux.Unlock()
- }
-
- return limiter
-}
-
-func (this *Limiter) LoadConfig() {
- ttl := time.Second
- switch core.ServerInfo.Config.LimitTTLUnit {
- case "ms":
- ttl = time.Millisecond
- case "m":
- ttl = time.Minute
- case "h":
- ttl = time.Hour
- }
- this.conns = core.ServerInfo.Config.LimitConnections
- this.httpLimiter = httplimiter.NewHttpLimiter(this.conns, ttl)
- iplookups := core.ServerInfo.Config.LimitIPLookup
- this.httpLimiter.IPLookups = strings.Split(iplookups, ",")
-
- util.Logger().Warnf(nil, "Rate-limit Load config, ttl: %s, conns: %d,
iplookups: %s", ttl, this.conns, iplookups)
-}
-
-func (this *Limiter) Handle(w http.ResponseWriter, r *http.Request) error {
- if this.conns <= 0 {
- return nil
- }
-
- httplimiter.SetResponseHeaders(this.httpLimiter, w)
- httpError := httplimiter.LimitByRequest(this.httpLimiter, r)
- if httpError != nil {
- w.Header().Add("Content-Type", this.httpLimiter.ContentType)
- w.WriteHeader(httpError.StatusCode)
- w.Write(util.StringToBytesWithNoCopy(httpError.Message))
- util.Logger().Warnf(nil, "Reached maximum request limit for %s
host and %s url", r.RemoteAddr, r.RequestURI)
- return errors.New(httpError.Message)
- }
- return nil
-}
-
-func Intercept(w http.ResponseWriter, r *http.Request) error {
- return GetLimiter().Handle(w, r)
-}
diff --git a/server/interceptor/ratelimiter/limiter_test.go
b/server/interceptor/ratelimiter/limiter_test.go
deleted file mode 100644
index eb8a752..0000000
--- a/server/interceptor/ratelimiter/limiter_test.go
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package ratelimiter
-
-import (
- "github.com/didip/tollbooth"
- . "github.com/onsi/ginkgo"
- . "github.com/onsi/gomega"
- "net/http"
- "net/http/httptest"
- "time"
-)
-
-var _ = Describe("HttpLimiter", func() {
- var (
- limiter *Limiter
- )
-
- BeforeEach(func() {
- limiter = new(Limiter)
- limiter.LoadConfig()
- })
- Describe("LoadConfig", func() {
- Context("Normal", func() {
- It("should be ok", func() {
- Expect(limiter.conns).To(Equal(int64(0)))
- res := []string{"RemoteAddr",
"X-Forwarded-For", "X-Real-IP"}
- for i, val := range
limiter.httpLimiter.IPLookups {
- Expect(val).To(Equal(res[i]))
- }
- })
- })
- })
- Describe("FuncHandler", func() {
- var ts *httptest.Server =
httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r
*http.Request) {
- err := limiter.Handle(w, r)
- if err != nil {
- return
- }
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("Testing..."))
- }))
- Context("Connections > 0", func() {
- It("should not be router", func() {
- limiter.conns = 1
- limiter.httpLimiter = tollbooth.NewLimiter(1,
time.Second)
- resp, err := http.Get(ts.URL)
- Expect(err).To(BeNil())
- Expect(resp.StatusCode).To(Equal(http.StatusOK))
-
- resp, err = http.Get(ts.URL)
- Expect(err).To(BeNil())
-
Expect(resp.StatusCode).ToNot(Equal(http.StatusOK))
- })
- })
- Context("Connections <= 0", func() {
- It("should be router", func() {
- limiter.conns = 0
- limiter.httpLimiter = tollbooth.NewLimiter(0,
time.Second)
- resp, err := http.Get(ts.URL)
- Expect(err).To(BeNil())
- Expect(resp.StatusCode).To(Equal(http.StatusOK))
- resp, err = http.Get(ts.URL)
- Expect(err).To(BeNil())
- Expect(resp.StatusCode).To(Equal(http.StatusOK))
- })
- })
- })
-})
diff --git a/server/server.go b/server/server.go
index c71a98c..a546879 100644
--- a/server/server.go
+++ b/server/server.go
@@ -155,10 +155,9 @@ func (s *ServiceCenterServer) startApiServer() {
restPort := beego.AppConfig.String("httpport")
rpcIp := beego.AppConfig.DefaultString("rpcaddr", "")
rpcPort := beego.AppConfig.DefaultString("rpcport", "")
- cmpName := core.ServerInfo.Config.LoggerName
- hostName := fmt.Sprintf("%s_%s", cmpName,
strings.Replace(util.GetLocalIP(), ".", "_", -1))
- s.apiServer.HostName = hostName
+ s.apiServer.HostName = fmt.Sprintf("%s_%s",
core.ServerInfo.Config.LoggerName,
+ strings.Replace(restIp, ".", "_", -1))
s.addEndpoint(REST, restIp, restPort)
s.addEndpoint(RPC, rpcIp, rpcPort)
s.apiServer.Start()
diff --git a/server/service/instances.go b/server/service/instances.go
index a452967..7008667 100644
--- a/server/service/instances.go
+++ b/server/service/instances.go
@@ -96,7 +96,8 @@ func (s *InstanceService) Register(ctx context.Context, in
*pb.RegisterInstanceR
}, err
}
if oldInstanceId != "" {
- util.Logger().Infof("instance more exist.")
+ util.Logger().Infof("register instance
successful, reuse service %s instance %s, operator %s",
+ instance.ServiceId, oldInstanceId,
remoteIP)
return &pb.RegisterInstanceResponse{
Response:
pb.CreateResponse(pb.Response_SUCCESS, "instance more exist."),
InstanceId: oldInstanceId,
diff --git a/server/service/microservices.go b/server/service/microservices.go
index 8514333..0e3e444 100644
--- a/server/service/microservices.go
+++ b/server/service/microservices.go
@@ -409,10 +409,16 @@ func (s *MicroServiceService) DeleteServices(ctx
context.Context, request *pb.De
}
util.Logger().Infof("Batch DeleteServices serviceId = %v , result = %d,
", request.ServiceIds, responseCode)
- return &pb.DelServicesResponse{
- Response: pb.CreateResponse(responseCode, "Delete services
successfully."),
+
+ resp := &pb.DelServicesResponse{
Services: delServiceRspInfo,
- }, nil
+ }
+ if responseCode != pb.Response_SUCCESS {
+ resp.Response = pb.CreateResponse(responseCode, "Delete
services failed.")
+ } else {
+ resp.Response = pb.CreateResponse(responseCode, "Delete
services successfully.")
+ }
+ return resp, nil
}
func (s *MicroServiceService) GetOne(ctx context.Context, in
*pb.GetServiceRequest) (*pb.GetServiceResponse, error) {
diff --git a/server/service/util/instance_util.go
b/server/service/util/instance_util.go
index 117ce38..e40fdae 100644
--- a/server/service/util/instance_util.go
+++ b/server/service/util/instance_util.go
@@ -79,7 +79,6 @@ func GetAllInstancesOfOneService(ctx context.Context,
domainProject string, serv
instances := make([]*pb.MicroServiceInstance, 0, len(resp.Kvs))
for _, kvs := range resp.Kvs {
- util.Logger().Debugf("start unmarshal service instance file:
%s", util.BytesToStringWithNoCopy(kvs.Key))
instance := &pb.MicroServiceInstance{}
err := json.Unmarshal(kvs.Value, instance)
if err != nil {
@@ -159,15 +158,6 @@ func ParseEndpointValue(value []byte) EndpointValue {
return endpointValue
}
-func isContain(endpoints []string, endpoint string) bool {
- for _, tmpEndpoint := range endpoints {
- if tmpEndpoint == endpoint {
- return true
- }
- }
- return false
-}
-
func DeleteServiceAllInstances(ctx context.Context, serviceId string) error {
domainProject := util.ParseDomainProject(ctx)
@@ -234,8 +224,6 @@ func QueryAllProvidersInstances(ctx context.Context,
selfServiceId string) (resu
util.Logger().Debugf("query provider service %s instances[%d]
with revision %d.", providerId, len(kvs), rev)
for _, kv := range kvs {
- util.Logger().Debugf("start unmarshal service instance
file with revision %d: %s",
- rev, util.BytesToStringWithNoCopy(kv.Key))
instance := &pb.MicroServiceInstance{}
err := json.Unmarshal(kv.Value, instance)
if err != nil {
diff --git a/server/service/util/instance_util_test.go
b/server/service/util/instance_util_test.go
index 5bba56c..5da480c 100644
--- a/server/service/util/instance_util_test.go
+++ b/server/service/util/instance_util_test.go
@@ -97,18 +97,6 @@ func TestCheckEndPoints(t *testing.T) {
fmt.Printf(`CheckEndPoints failed`)
t.FailNow()
}
-
- b := isContain([]string{"a"}, "a")
- if !b {
- fmt.Printf(`isContain contain failed`)
- t.FailNow()
- }
-
- b = isContain([]string{}, "a")
- if b {
- fmt.Printf(`isContain empty failed`)
- t.FailNow()
- }
}
func TestDeleteServiceAllInstances(t *testing.T) {
diff --git a/server/service/util/rule_util.go b/server/service/util/rule_util.go
index b8b0774..2ecabc0 100644
--- a/server/service/util/rule_util.go
+++ b/server/service/util/rule_util.go
@@ -79,7 +79,6 @@ func GetRulesUtil(ctx context.Context, domainProject string,
serviceId string) (
rules := []*pb.ServiceRule{}
for _, kvs := range resp.Kvs {
- util.Logger().Debugf("start unmarshal service rule file: %s",
util.BytesToStringWithNoCopy(kvs.Key))
rule := &pb.ServiceRule{}
err := json.Unmarshal(kvs.Value, rule)
if err != nil {
--
To stop receiving notification emails like this one, please contact
[email protected].