little-cui closed pull request #267: SCB-321 Support report trace data URL: https://github.com/apache/incubator-servicecomb-service-center/pull/267
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/tracing-file.PNG b/docs/tracing-file.PNG new file mode 100644 index 00000000..e6ee116a Binary files /dev/null and b/docs/tracing-file.PNG differ diff --git a/docs/tracing-server.PNG b/docs/tracing-server.PNG new file mode 100644 index 00000000..70b7c0c4 Binary files /dev/null and b/docs/tracing-server.PNG differ diff --git a/etc/conf/app.conf b/etc/conf/app.conf index a48f08c5..a4fb8dc8 100644 --- a/etc/conf/app.conf +++ b/etc/conf/app.conf @@ -47,12 +47,20 @@ cipher_plugin = "" #suppot buildin, unlimit quota_plugin = "" -# access control plugin +#access control plugin auth_plugin = "" #support om, manage auditlog_plugin = "" +#tracing: buildin(zipkin) +# buildin(zipkin): Can export TRACING_COLLECTOR env variable to select +# collector type, 'server' means report trace data +# to zipkin server address specified by TRACING_SERVER_ADDRESS +# env variable; 'file' means just output a file stored +# in path specified by TRACING_FILE_PATH env variable +trace_plugin = "" + ################################################################### # rate limit options ################################################################### diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go index 69be2ce4..9b2e7fa2 100644 --- a/pkg/chain/callback.go +++ b/pkg/chain/callback.go @@ -17,7 +17,6 @@ package chain import ( - "fmt" "github.com/apache/incubator-servicecomb-service-center/pkg/util" ) @@ -31,28 +30,33 @@ func (r Result) String() string { if r.OK { return "OK" } - return fmt.Sprintf("FAIL(error: %s)", r.Err) + return r.Err.Error() } type Callback struct { - Func func(r Result) + Func func(r Result) + Async bool } func (cb *Callback) Invoke(r Result) { - go cb.syncInvoke(r) + if cb.Async { + go syncInvoke(cb.Func, r) + return + } + syncInvoke(cb.Func, r) } -func (cb *Callback) syncInvoke(r Result) { +func syncInvoke(f func(r Result), r Result) { defer func() { if itf := recover(); itf != nil { util.LogPanic(itf) } }() - if cb.Func == nil { + if f == nil { util.Logger().Errorf(nil, "Callback function is nil. result: %s,", r) return } - cb.Func(r) + f(r) } func (cb *Callback) Fail(err error, args ...interface{}) { diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index a48fc1c5..8cfb4dd9 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -30,10 +30,7 @@ type Chain struct { func (c *Chain) Init(chainName string, hs []Handler) { c.name = chainName c.currentIndex = -1 - if len(hs) > 0 { - c.handlers = make([]Handler, len(hs)) - copy(c.handlers, hs) - } + c.handlers = hs } func (c *Chain) Name() string { @@ -63,8 +60,7 @@ func (c *Chain) Next(i *Invocation) { c.syncNext(i) } -func NewChain(name string, handlers []Handler) Chain { - var ch Chain +func NewChain(name string, handlers []Handler) (ch Chain) { ch.Init(name, handlers) return ch } diff --git a/pkg/chain/chain_test.go b/pkg/chain/chain_test.go index 8d67fa8b..3e5b8c01 100644 --- a/pkg/chain/chain_test.go +++ b/pkg/chain/chain_test.go @@ -22,53 +22,51 @@ import ( "testing" ) +const ( + times = 1000000 + count = 100 +) + +func init() { + for i := 0; i < count; i++ { + chain.RegisterHandler("_bench_handlers_", &handler{}) + } +} + type handler struct { } func (h *handler) Handle(i *chain.Invocation) { - counter := i.Context().Value("counter").(int) - counter++ - i.WithContext("counter", counter) i.Next() } -func syncFunc(ctx map[string]interface{}) { - counter := ctx["counter"].(int) - counter++ - ctx["counter"] = counter +func syncFunc() { + for i := 0; i < count; i++ { + } } func BenchmarkChain(b *testing.B) { - b.N = 100 - if len(chain.Handlers("_bench_handlers_")) == 0 { - for i := 0; i < b.N; i++ { - chain.RegisterHandler("_bench_handlers_", &handler{}) - } - } + var ( + ctx = context.Background() + f = func(r chain.Result) {} + ) + b.N = times b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - ch := make(chan struct{}) - inv := chain.NewInvocation(context.Background(), - chain.NewChain("_bench_chain_", chain.Handlers("_bench_handlers_"))) - inv.WithContext("counter", 0) - inv.Invoke(func(r chain.Result) { close(ch) }) - <-ch + inv := chain.NewInvocation(ctx, chain.NewChain("_bench_chain_", chain.Handlers("_bench_handlers_"))) + inv.Next(chain.WithFunc(f)) } }) b.ReportAllocs() } func BenchmarkSync(b *testing.B) { - b.N = 100 + b.N = times b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { - ctx := make(map[string]interface{}, 1) - ctx["counter"] = 0 - for i := 0; i < b.N; i++ { - syncFunc(ctx) - } + syncFunc() } }) b.ReportAllocs() diff --git a/pkg/chain/invocation.go b/pkg/chain/invocation.go index 173cb7f5..0ab0df7b 100644 --- a/pkg/chain/invocation.go +++ b/pkg/chain/invocation.go @@ -21,6 +21,20 @@ import ( "golang.org/x/net/context" ) +type InvocationOption func(op InvocationOp) InvocationOp + +type InvocationOp struct { + Func func(r Result) + Async bool +} + +func WithFunc(f func(r Result)) InvocationOption { + return func(op InvocationOp) InvocationOp { op.Func = f; return op } +} +func WithAsyncFunc(f func(r Result)) InvocationOption { + return func(op InvocationOp) InvocationOp { op.Func = f; op.Async = true; return op } +} + type Invocation struct { Callback context *util.StringContext @@ -41,17 +55,45 @@ func (i *Invocation) WithContext(key string, val interface{}) *Invocation { return i } -func (i *Invocation) Next() { +func (i *Invocation) Next(opts ...InvocationOption) { + var op InvocationOp + for _, opt := range opts { + op = opt(op) + } + + i.setCallback(op.Func, op.Async) i.chain.Next(i) } +func (i *Invocation) setCallback(f func(r Result), async bool) { + if f == nil { + return + } + + if i.Func == nil { + i.Func = f + i.Async = async + return + } + + cb := i.Func + i.Func = func(r Result) { + cb(r) + callback(f, async, r) + } +} + +func callback(f func(r Result), async bool, r Result) { + c := Callback{Func: f, Async: async} + c.Invoke(r) +} + func (i *Invocation) Invoke(f func(r Result)) { i.Func = f - i.Next() + i.chain.Next(i) } -func NewInvocation(ctx context.Context, ch Chain) Invocation { - var inv Invocation +func NewInvocation(ctx context.Context, ch Chain) (inv Invocation) { inv.Init(ctx, ch) return inv } diff --git a/pkg/rest/common.go b/pkg/rest/common.go index 4e135fbf..10104942 100644 --- a/pkg/rest/common.go +++ b/pkg/rest/common.go @@ -29,6 +29,7 @@ const ( CTX_RESPONSE = "_server_response" CTX_REQUEST = "_server_request" CTX_MATCH_PATTERN = "_server_match_pattern" + CTX_MATCH_FUNC = "_server_match_func" SERVER_CHAIN_NAME = "_server_chain" ) diff --git a/pkg/rest/route.go b/pkg/rest/route.go index 3015affa..e3eb7c51 100644 --- a/pkg/rest/route.go +++ b/pkg/rest/route.go @@ -33,6 +33,7 @@ type URLPattern struct { } type urlPatternHandler struct { + Name string Path string http.Handler } @@ -68,7 +69,8 @@ func (this *ROAServerHandler) addRoute(route *Route) (err error) { return errors.New(message) } - this.handlers[method] = append(this.handlers[method], &urlPatternHandler{route.Path, http.HandlerFunc(route.Func)}) + this.handlers[method] = append(this.handlers[method], &urlPatternHandler{ + util.FormatFuncName(util.FuncName(route.Func)), route.Path, http.HandlerFunc(route.Func)}) util.Logger().Infof("register route %s(%s).", route.Path, method) return nil @@ -121,14 +123,13 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter *r = *nr } - ch := make(chan struct{}) inv := chain.NewInvocation(ctx, chain.NewChain(SERVER_CHAIN_NAME, hs)) inv.WithContext(CTX_RESPONSE, w). WithContext(CTX_REQUEST, r). - WithContext(CTX_MATCH_PATTERN, ph.Path) - inv.Invoke(func(ret chain.Result) { + WithContext(CTX_MATCH_PATTERN, ph.Path). + WithContext(CTX_MATCH_FUNC, ph.Name) + inv.Next(chain.WithFunc(func(ret chain.Result) { defer func() { - defer close(ch) err := ret.Err itf := recover() if itf != nil { @@ -148,8 +149,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler, w http.ResponseWriter if ret.OK { ph.ServeHTTP(w, r) } - }) - <-ch + })) } func (this *urlPatternHandler) try(path string) (p string, _ bool) { diff --git a/pkg/util/context.go b/pkg/util/context.go new file mode 100644 index 00000000..41b41e46 --- /dev/null +++ b/pkg/util/context.go @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "golang.org/x/net/context" + "net/http" + "time" +) + +type StringContext struct { + parentCtx context.Context + kv map[string]interface{} +} + +func (c *StringContext) Deadline() (deadline time.Time, ok bool) { + return c.parentCtx.Deadline() +} + +func (c *StringContext) Done() <-chan struct{} { + return c.parentCtx.Done() +} + +func (c *StringContext) Err() error { + return c.parentCtx.Err() +} + +func (c *StringContext) Value(key interface{}) interface{} { + k, ok := key.(string) + if !ok { + return c.parentCtx.Value(key) + } + return c.kv[k] +} + +func (c *StringContext) SetKV(key string, val interface{}) { + c.kv[key] = val +} + +func NewStringContext(ctx context.Context) *StringContext { + strCtx, ok := ctx.(*StringContext) + if !ok { + strCtx = &StringContext{ + parentCtx: ctx, + kv: make(map[string]interface{}, 10), + } + } + return strCtx +} + +func SetContext(ctx context.Context, key string, val interface{}) context.Context { + strCtx := NewStringContext(ctx) + strCtx.SetKV(key, val) + return strCtx +} + +func CloneContext(ctx context.Context) context.Context { + strCtx := &StringContext{ + parentCtx: ctx, + kv: make(map[string]interface{}, 10), + } + + old, ok := ctx.(*StringContext) + if !ok { + return strCtx + } + + for k, v := range old.kv { + strCtx.kv[k] = v + } + return strCtx +} + +func FromContext(ctx context.Context, key string) interface{} { + return ctx.Value(key) +} + +func SetRequestContext(r *http.Request, key string, val interface{}) *http.Request { + ctx := r.Context() + ctx = SetContext(ctx, key, val) + if ctx != r.Context() { + nr := r.WithContext(ctx) + *r = *nr + } + return r +} + +func ParseDomainProject(ctx context.Context) string { + return ParseDomain(ctx) + "/" + ParseProject(ctx) +} + +func ParseTargetDomainProject(ctx context.Context) string { + return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx) +} + +func ParseDomain(ctx context.Context) string { + v, ok := FromContext(ctx, "domain").(string) + if !ok { + return "" + } + return v +} + +func ParseTargetDomain(ctx context.Context) string { + v, _ := FromContext(ctx, "target-domain").(string) + if len(v) == 0 { + return ParseDomain(ctx) + } + return v +} + +func ParseProject(ctx context.Context) string { + v, ok := FromContext(ctx, "project").(string) + if !ok { + return "" + } + return v +} + +func ParseTargetProject(ctx context.Context) string { + v, _ := FromContext(ctx, "target-project").(string) + if len(v) == 0 { + return ParseProject(ctx) + } + return v +} + +func SetDomain(ctx context.Context, domain string) context.Context { + return SetContext(ctx, "domain", domain) +} + +func SetProject(ctx context.Context, project string) context.Context { + return SetContext(ctx, "project", project) +} + +func SetTargetDomain(ctx context.Context, domain string) context.Context { + return SetContext(ctx, "target-domain", domain) +} + +func SetTargetProject(ctx context.Context, project string) context.Context { + return SetContext(ctx, "target-project", project) +} + +func SetDomainProject(ctx context.Context, domain string, project string) context.Context { + return SetProject(SetDomain(ctx, domain), project) +} + +func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context { + return SetTargetProject(SetTargetDomain(ctx, domain), project) +} diff --git a/pkg/util/net.go b/pkg/util/net.go new file mode 100644 index 00000000..d73e928c --- /dev/null +++ b/pkg/util/net.go @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import ( + "golang.org/x/net/context" + "net" + "net/http" + "net/url" + "strings" +) + +func GetIPFromContext(ctx context.Context) string { + v, ok := FromContext(ctx, "x-remote-ip").(string) + if !ok { + return "" + } + return v +} + +func UrlEncode(keys map[string]string) string { + l := len(keys) + if l == 0 { + return "" + } + arr := make([]string, 0, l) + for k, v := range keys { + arr = append(arr, url.QueryEscape(k)+"="+url.QueryEscape(v)) + } + return StringJoin(arr, "&") +} + +func ParseEndpoint(ep string) (string, error) { + u, err := url.Parse(ep) + if err != nil { + return "", err + } + port := u.Port() + if len(port) > 0 { + return u.Hostname() + ":" + port, nil + } + return u.Hostname(), nil +} + +func GetRealIP(r *http.Request) string { + for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} { + addresses := strings.Split(r.Header.Get(h), ",") + for _, ip := range addresses { + ip = strings.TrimSpace(ip) + realIP := net.ParseIP(ip) + if !realIP.IsGlobalUnicast() { + continue + } + return ip + } + } + addrs := strings.Split(r.RemoteAddr, ":") + if len(addrs) > 0 { + return addrs[0] + } + return "" +} + +func GetLocalIP() string { + addrs, err := net.InterfaceAddrs() + if err != nil { + return "" + } + for _, address := range addrs { + // check the address type and if it is not a loopback the display it + if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { + if ipnet.IP.To4() != nil { + return ipnet.IP.String() + } + } + } + return "" +} + +func InetNtoIP(ipnr uint32) net.IP { + return net.IPv4(byte(ipnr>>24), byte(ipnr>>16), byte(ipnr>>8), byte(ipnr)) +} + +func InetNtoa(ipnr uint32) string { + return InetNtoIP(ipnr).String() +} + +func InetAton(ip string) (ipnr uint32) { + bytes := net.ParseIP(ip).To4() + for i := 0; i < len(bytes); i++ { + ipnr |= uint32(bytes[i]) + if i < 3 { + ipnr <<= 8 + } + } + return +} diff --git a/pkg/util/net_test.go b/pkg/util/net_test.go new file mode 100644 index 00000000..b61b99d5 --- /dev/null +++ b/pkg/util/net_test.go @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package util + +import "testing" + +const ( + ip1 = "127.0.0.1" // 2130706433 + ip2 = "0.0.0.0" // 0 + ip3 = "255.255.255.255" // 4294967295 + n1 = 2130706433 // "127.0.0.1" + n2 = 0 // "0.0.0.0" + n3 = 4294967295 // "255.255.255.255" +) + +func TestInetAton(t *testing.T) { + i := InetAton(ip1) + if i != 2130706433 { + fail(t, "InetAton(%s) error", ip1) + } + i = InetAton(ip2) + if i != 0 { + fail(t, "InetAton(%s) error", ip2) + } + i = InetAton(ip3) + if i != 4294967295 { + fail(t, "InetAton(%s) error", ip3) + } +} + +func TestInetNtoa(t *testing.T) { + ip := InetNtoa(n1) + if ip != ip1 { + fail(t, "InetNtoa(%d) error", n1) + } + ip = InetNtoa(n2) + if ip != ip2 { + fail(t, "InetNtoa(%d) error", n2) + } + ip = InetNtoa(n3) + if ip != ip3 { + fail(t, "InetNtoa(%d) error", n3) + } +} diff --git a/pkg/util/sys.go b/pkg/util/sys.go index dae6651d..8a9a23d0 100644 --- a/pkg/util/sys.go +++ b/pkg/util/sys.go @@ -17,28 +17,12 @@ package util import ( - "net" + "os" "unsafe" ) const INT_SIZE int = int(unsafe.Sizeof(0)) -func GetLocalIP() string { - addrs, err := net.InterfaceAddrs() - if err != nil { - return "" - } - for _, address := range addrs { - // check the address type and if it is not a loopback the display it - if ipnet, ok := address.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ipnet.IP.To4() != nil { - return ipnet.IP.String() - } - } - } - return "" -} - func IsBigEndian() bool { return !IsLittleEndian() } @@ -48,3 +32,8 @@ func IsLittleEndian() bool { bs := (*[INT_SIZE]byte)(unsafe.Pointer(&i)) return bs[0] == 0 } + +func PathExist(path string) bool { + _, err := os.Stat(path) + return err == nil || os.IsExist(err) +} diff --git a/pkg/util/util.go b/pkg/util/util.go index 17997a65..554af471 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -20,10 +20,6 @@ import ( "bytes" "encoding/gob" "fmt" - "golang.org/x/net/context" - "net" - "net/http" - "net/url" "os" "reflect" "runtime" @@ -33,11 +29,6 @@ import ( "unsafe" ) -func PathExist(path string) bool { - _, err := os.Stat(path) - return err == nil || os.IsExist(err) -} - func MinInt(x, y int) int { if x <= y { return x @@ -68,155 +59,6 @@ func ClearByteMemory(src []byte) { } } -type StringContext struct { - parentCtx context.Context - kv map[string]interface{} -} - -func (c *StringContext) Deadline() (deadline time.Time, ok bool) { - return c.parentCtx.Deadline() -} - -func (c *StringContext) Done() <-chan struct{} { - return c.parentCtx.Done() -} - -func (c *StringContext) Err() error { - return c.parentCtx.Err() -} - -func (c *StringContext) Value(key interface{}) interface{} { - k, ok := key.(string) - if !ok { - return c.parentCtx.Value(key) - } - return c.kv[k] -} - -func (c *StringContext) SetKV(key string, val interface{}) { - c.kv[key] = val -} - -func NewStringContext(ctx context.Context) *StringContext { - strCtx, ok := ctx.(*StringContext) - if !ok { - strCtx = &StringContext{ - parentCtx: ctx, - kv: make(map[string]interface{}, 10), - } - } - return strCtx -} - -func SetContext(ctx context.Context, key string, val interface{}) context.Context { - strCtx := NewStringContext(ctx) - strCtx.SetKV(key, val) - return strCtx -} - -func CloneContext(ctx context.Context) context.Context { - strCtx := &StringContext{ - parentCtx: ctx, - kv: make(map[string]interface{}, 10), - } - - old, ok := ctx.(*StringContext) - if !ok { - return strCtx - } - - for k, v := range old.kv { - strCtx.kv[k] = v - } - return strCtx -} - -func FromContext(ctx context.Context, key string) interface{} { - return ctx.Value(key) -} - -func SetRequestContext(r *http.Request, key string, val interface{}) *http.Request { - ctx := r.Context() - ctx = SetContext(ctx, key, val) - if ctx != r.Context() { - nr := r.WithContext(ctx) - *r = *nr - } - return r -} - -func ParseDomainProject(ctx context.Context) string { - return ParseDomain(ctx) + "/" + ParseProject(ctx) -} - -func ParseTargetDomainProject(ctx context.Context) string { - return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx) -} - -func ParseDomain(ctx context.Context) string { - v, ok := FromContext(ctx, "domain").(string) - if !ok { - return "" - } - return v -} - -func ParseTargetDomain(ctx context.Context) string { - v, _ := FromContext(ctx, "target-domain").(string) - if len(v) == 0 { - return ParseDomain(ctx) - } - return v -} - -func ParseProject(ctx context.Context) string { - v, ok := FromContext(ctx, "project").(string) - if !ok { - return "" - } - return v -} - -func ParseTargetProject(ctx context.Context) string { - v, _ := FromContext(ctx, "target-project").(string) - if len(v) == 0 { - return ParseProject(ctx) - } - return v -} - -func SetDomain(ctx context.Context, domain string) context.Context { - return SetContext(ctx, "domain", domain) -} - -func SetProject(ctx context.Context, project string) context.Context { - return SetContext(ctx, "project", project) -} - -func SetTargetDomain(ctx context.Context, domain string) context.Context { - return SetContext(ctx, "target-domain", domain) -} - -func SetTargetProject(ctx context.Context, project string) context.Context { - return SetContext(ctx, "target-project", project) -} - -func SetDomainProject(ctx context.Context, domain string, project string) context.Context { - return SetProject(SetDomain(ctx, domain), project) -} - -func SetTargetDomainProject(ctx context.Context, domain string, project string) context.Context { - return SetTargetProject(SetTargetDomain(ctx, domain), project) -} - -func GetIPFromContext(ctx context.Context) string { - v, ok := FromContext(ctx, "x-remote-ip").(string) - if !ok { - return "" - } - return v -} - func DeepCopy(dst, src interface{}) error { var buf bytes.Buffer if err := gob.NewEncoder(&buf).Encode(src); err != nil { @@ -325,37 +167,6 @@ func GetCaller(skip int) (string, string, int, bool) { return file, method, line, ok } -func ParseEndpoint(ep string) (string, error) { - u, err := url.Parse(ep) - if err != nil { - return "", err - } - port := u.Port() - if len(port) > 0 { - return u.Hostname() + ":" + port, nil - } - return u.Hostname(), nil -} - -func GetRealIP(r *http.Request) string { - for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} { - addresses := strings.Split(r.Header.Get(h), ",") - for _, ip := range addresses { - ip = strings.TrimSpace(ip) - realIP := net.ParseIP(ip) - if !realIP.IsGlobalUnicast() { - continue - } - return ip - } - } - addrs := strings.Split(r.RemoteAddr, ":") - if len(addrs) > 0 { - return addrs[0] - } - return "" -} - func BytesToInt32(bs []byte) (in int32) { l := len(bs) if l > 4 || l == 0 { @@ -376,18 +187,6 @@ func BytesToInt32(bs []byte) (in int32) { return } -func UrlEncode(keys map[string]string) string { - l := len(keys) - if l == 0 { - return "" - } - arr := make([]string, 0, l) - for k, v := range keys { - arr = append(arr, url.QueryEscape(k)+"="+url.QueryEscape(v)) - } - return StringJoin(arr, "&") -} - func FormatFuncName(f string) string { i := strings.LastIndex(f, "/") j := strings.Index(f[i+1:], ".") diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go index 5c50e651..b92c45ac 100644 --- a/server/bootstrap/bootstrap.go +++ b/server/bootstrap/bootstrap.go @@ -39,6 +39,9 @@ import _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/i // uuid import _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin" +// tracing +import _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin" + // module import _ "github.com/apache/incubator-servicecomb-service-center/server/govern" @@ -48,6 +51,7 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/handler/cache" "github.com/apache/incubator-servicecomb-service-center/server/handler/context" "github.com/apache/incubator-servicecomb-service-center/server/handler/metric" + "github.com/apache/incubator-servicecomb-service-center/server/handler/tracing" "github.com/apache/incubator-servicecomb-service-center/server/interceptor" "github.com/apache/incubator-servicecomb-service-center/server/interceptor/access" "github.com/apache/incubator-servicecomb-service-center/server/interceptor/cors" @@ -64,6 +68,7 @@ func init() { // handle requests after routing. metric.RegisterHandlers() + tracing.RegisterHandlers() auth.RegisterHandlers() context.RegisterHandlers() cache.RegisterHandlers() diff --git a/server/handler/metric/metric.go b/server/handler/metric/metric.go index 18dcae4f..42d2f8a6 100644 --- a/server/handler/metric/metric.go +++ b/server/handler/metric/metric.go @@ -31,17 +31,14 @@ type MetricsHandler struct { func (h *MetricsHandler) Handle(i *chain.Invocation) { w, r := i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter), i.Context().Value(rest.CTX_REQUEST).(*http.Request) - cb := i.Func - i.Invoke(func(ret chain.Result) { - cb(ret) - + i.Next(chain.WithAsyncFunc(func(ret chain.Result) { start, ok := i.Context().Value(svr.CTX_START_TIMESTAMP).(time.Time) if !ok { return } svr.ReportRequestCompleted(w, r, start) util.LogNilOrWarnf(start, "%s %s", r.Method, r.RequestURI) - }) + })) } func RegisterHandlers() { diff --git a/server/handler/tracing/tracing.go b/server/handler/tracing/tracing.go index caa04db9..09776c15 100644 --- a/server/handler/tracing/tracing.go +++ b/server/handler/tracing/tracing.go @@ -19,50 +19,29 @@ package tracing import ( "github.com/apache/incubator-servicecomb-service-center/pkg/chain" "github.com/apache/incubator-servicecomb-service-center/pkg/rest" - "github.com/apache/incubator-servicecomb-service-center/server/core" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/ext" - zipkin "github.com/openzipkin/zipkin-go-opentracing" - "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "github.com/apache/incubator-servicecomb-service-center/server/plugin" "net/http" + "strconv" ) -var tracer opentracing.Tracer - -func init() { - collector, err := zipkin.NewHTTPCollector("127.0.0.1:9411") - if err != nil { - return - } - recorder := zipkin.NewRecorder(collector, false, "0.0.0.0:0", core.Service.ServiceName) - tracer, err = zipkin.NewTracer(recorder, zipkin.TraceID128Bit(true)) -} - type TracingHandler struct { } func (h *TracingHandler) Handle(i *chain.Invocation) { - w, request := i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter), - i.Context().Value(rest.CTX_REQUEST).(*http.Request) - ctx, err := tracer.Extract(opentracing.TextMap, opentracing.HTTPHeadersCarrier(request.Header)) - switch err { - case nil: - case opentracing.ErrSpanContextNotFound: - default: - } - - span := tracer.StartSpan("api", ext.RPCServerOption(ctx)) - ext.SpanKindRPCServer.Set(span) - - cb := i.Func - i.Invoke(func(r chain.Result) { - cb(r) - span.SetTag(zipkincore.HTTP_METHOD, request.Method) - span.SetTag(zipkincore.HTTP_PATH, request.RequestURI) - span.SetTag(zipkincore.HTTP_STATUS_CODE, w.Header().Get("X-Response-Status")) - span.SetTag(zipkincore.HTTP_HOST, request.URL.Host) - span.Finish() - }) + w, r, op := i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter), + i.Context().Value(rest.CTX_REQUEST).(*http.Request), + i.Context().Value(rest.CTX_MATCH_FUNC).(string) + + span := plugin.Plugins().Tracing().ServerBegin(op, r) + + i.Next(chain.WithAsyncFunc(func(ret chain.Result) { + statusCode := w.Header().Get("X-Response-Status") + code, _ := strconv.ParseInt(statusCode, 10, 64) + if code == 0 { + code = 200 + } + plugin.Plugins().Tracing().ServerEnd(span, int(code), statusCode) + })) } func RegisterHandlers() { diff --git a/server/infra/tracing/tracing.go b/server/infra/tracing/tracing.go new file mode 100644 index 00000000..3e5d674a --- /dev/null +++ b/server/infra/tracing/tracing.go @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package tracing + +const CTX_TRACE_SPAN = "x-trace-span" + +type Request interface{} +type Span interface{} + +type Tracing interface { + ServerBegin(operationName string, r Request) Span + ServerEnd(span Span, code int, message string) + ClientBegin(operationName string, r Request) Span + ClientEnd(span Span, code int, message string) +} diff --git a/server/plugin/README.md b/server/plugin/README.md index 89ff610d..c11ed53d 100644 --- a/server/plugin/README.md +++ b/server/plugin/README.md @@ -10,6 +10,7 @@ 1. auditlog, Customize audit log for any change done to the service-center. 1. cipher, Customize encryption and decryption of TLS certificate private key password. 1. quota, Customize quota for instance registry. +1. tracing, Customize tracing data reporter. ## Example: an authentication plug-in diff --git a/server/plugin/infra/registry/etcd/etcd.go b/server/plugin/infra/registry/etcd/etcd.go index 1e57ee1c..7ffe5a6b 100644 --- a/server/plugin/infra/registry/etcd/etcd.go +++ b/server/plugin/infra/registry/etcd/etcd.go @@ -324,13 +324,20 @@ func (c *EtcdClient) paging(ctx context.Context, op registry.PluginOp) (*clientv } func (c *EtcdClient) Do(ctx context.Context, opts ...registry.PluginOpOption) (*registry.PluginResponse, error) { + var ( + err error + resp *registry.PluginResponse + ) + start := time.Now() op := registry.OptionsToOp(opts...) + span := TracingBegin(ctx, "etcd:do", op) + defer TracingEnd(span, err) + otCtx, cancel := registry.WithTimeout(ctx) defer cancel() - var err error - var resp *registry.PluginResponse + switch op.Action { case registry.Get: var etcdResp *clientv3.GetResponse @@ -378,9 +385,11 @@ func (c *EtcdClient) Do(ctx context.Context, opts ...registry.PluginOpOption) (* Revision: etcdResp.Header.Revision, } } + if err != nil { return nil, err } + resp.Succeeded = true util.LogNilOrWarnf(start, "registry client do %s", op) @@ -399,6 +408,8 @@ func (c *EtcdClient) Txn(ctx context.Context, opts []registry.PluginOp) (*regist } func (c *EtcdClient) TxnWithCmp(ctx context.Context, success []registry.PluginOp, cmps []registry.CompareOp, fail []registry.PluginOp) (*registry.PluginResponse, error) { + var err error + otCtx, cancel := registry.WithTimeout(ctx) defer cancel() @@ -407,6 +418,9 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context, success []registry.PluginOp etcdSuccessOps := c.toTxnRequest(success) etcdFailOps := c.toTxnRequest(fail) + span := TracingBegin(ctx, "etcd:txn", success[0]) + defer TracingEnd(span, err) + kvc := clientv3.NewKV(c.Client) txn := kvc.Txn(otCtx) if len(etcdCmps) > 0 { @@ -428,6 +442,11 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context, success []registry.PluginOp } func (c *EtcdClient) LeaseGrant(ctx context.Context, TTL int64) (int64, error) { + var err error + span := TracingBegin(ctx, "etcd:grant", + registry.PluginOp{Action: registry.Put, Key: util.StringToBytesWithNoCopy(fmt.Sprint(TTL))}) + defer TracingEnd(span, err) + otCtx, cancel := registry.WithTimeout(ctx) defer cancel() start := time.Now() @@ -440,6 +459,11 @@ func (c *EtcdClient) LeaseGrant(ctx context.Context, TTL int64) (int64, error) { } func (c *EtcdClient) LeaseRenew(ctx context.Context, leaseID int64) (int64, error) { + var err error + span := TracingBegin(ctx, "etcd:keepalive", + registry.PluginOp{Action: registry.Put, Key: util.StringToBytesWithNoCopy(fmt.Sprint(leaseID))}) + defer TracingEnd(span, err) + otCtx, cancel := registry.WithTimeout(ctx) defer cancel() start := time.Now() @@ -455,10 +479,15 @@ func (c *EtcdClient) LeaseRenew(ctx context.Context, leaseID int64) (int64, erro } func (c *EtcdClient) LeaseRevoke(ctx context.Context, leaseID int64) error { + var err error + span := TracingBegin(ctx, "etcd:revoke", + registry.PluginOp{Action: registry.Delete, Key: util.StringToBytesWithNoCopy(fmt.Sprint(leaseID))}) + defer TracingEnd(span, err) + otCtx, cancel := registry.WithTimeout(ctx) defer cancel() start := time.Now() - _, err := c.Client.Revoke(otCtx, clientv3.LeaseID(leaseID)) + _, err = c.Client.Revoke(otCtx, clientv3.LeaseID(leaseID)) if err != nil { if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound) { return err diff --git a/server/plugin/infra/registry/etcd/tracing.go b/server/plugin/infra/registry/etcd/tracing.go new file mode 100644 index 00000000..7e628fc8 --- /dev/null +++ b/server/plugin/infra/registry/etcd/tracing.go @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package etcd + +import ( + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" + "github.com/apache/incubator-servicecomb-service-center/server/infra/tracing" + "github.com/apache/incubator-servicecomb-service-center/server/plugin" + "golang.org/x/net/context" + "net/http" +) + +func TracingBegin(ctx context.Context, operationName string, op registry.PluginOp) tracing.Span { + var action string + switch op.Action { + case registry.Get: + action = http.MethodGet + case registry.Put: + action = http.MethodPut + case registry.Delete: + action = http.MethodDelete + } + r, err := http.NewRequest(action, util.BytesToStringWithNoCopy(op.Key), nil) + if err != nil { + util.Logger().Errorf(err, "new backend request failed") + return nil + } + r = r.WithContext(ctx) + return plugin.Plugins().Tracing().ClientBegin(operationName, r) +} + +func TracingEnd(span tracing.Span, err error) { + if err != nil { + plugin.Plugins().Tracing().ClientEnd(span, http.StatusInternalServerError, err.Error()) + return + } + plugin.Plugins().Tracing().ClientEnd(span, http.StatusOK, "") +} diff --git a/server/plugin/infra/tracing/buildin/README.md b/server/plugin/infra/tracing/buildin/README.md new file mode 100644 index 00000000..140203e5 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/README.md @@ -0,0 +1,34 @@ +# Report trace data + +## Edit the configuration of the tracing plugin +```bash +trace_plugin='buildin' # or empty +``` + +## To zipkin server + +[zipkin](/docs/tracing-server.PNG) + +### Add the zipkin server endpoint +``` +# Export the environments +export TRACING_COLLECTOR=server +export TRACING_SERVER_ADDRESS=http://127.0.0.1:9411 # zipkin server endpoint + +# Start the Service-center +./servicecenter +``` + +## To file + +[file](/docs/tracing-file.PNG) + +### Customize the path of trace data file +``` +# Export the environments +export TRACING_COLLECTOR=file +export TRACING_FILE_PATH=/tmp/servicecenter.trace # if not set, use ${work directory}/SERVICECENTER.trace + +# Start the Service-center +./servicecenter +``` diff --git a/server/plugin/infra/tracing/buildin/buildin.go b/server/plugin/infra/tracing/buildin/buildin.go new file mode 100644 index 00000000..bfae0863 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/buildin.go @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package buildin + +import ( + "context" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/apache/incubator-servicecomb-service-center/server/infra/tracing" + mgr "github.com/apache/incubator-servicecomb-service-center/server/plugin" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/ext" + "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "net/http" + "sync" +) + +var once sync.Once + +func init() { + mgr.RegisterPlugin(mgr.Plugin{mgr.TRACING, "buildin", New}) +} + +func New() mgr.PluginInstance { + return &Zipkin{} +} + +type Zipkin struct { +} + +func (zp *Zipkin) ServerBegin(operationName string, itf tracing.Request) tracing.Span { + var ( + span opentracing.Span + ctx context.Context + ) + switch itf.(type) { + case *http.Request: + r := itf.(*http.Request) + ctx = r.Context() + + wireContext, err := ZipkinTracer().Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(r.Header)) + switch err { + case nil: + case opentracing.ErrSpanContextNotFound: + default: + util.Logger().Errorf(err, "tracer extract request failed") + return nil + } + + span = ZipkinTracer().StartSpan(operationName, ext.RPCServerOption(wireContext)) + ext.SpanKindRPCServer.Set(span) + ext.HTTPMethod.Set(span, r.Method) + ext.HTTPUrl.Set(span, r.URL.String()) + + span.SetTag("protocol", "HTTP") + span.SetTag(zipkincore.HTTP_PATH, r.URL.Path) + span.SetTag(zipkincore.HTTP_HOST, r.URL.Host) + default: + // grpc? + return nil + } + + util.SetContext(ctx, tracing.CTX_TRACE_SPAN, span) + return span +} + +func (zp *Zipkin) ServerEnd(itf tracing.Span, code int, message string) { + span, ok := itf.(opentracing.Span) + if !ok { + return + } + setResultTags(span, code, message) + span.Finish() +} + +func (zp *Zipkin) ClientBegin(operationName string, itf tracing.Request) tracing.Span { + var ( + span opentracing.Span + ctx context.Context + carrier interface{} + ) + switch itf.(type) { + case *http.Request: + r := itf.(*http.Request) + ctx = r.Context() + + parentSpan, ok := ctx.Value(tracing.CTX_TRACE_SPAN).(opentracing.Span) + if !ok { + return nil + } + span = ZipkinTracer().StartSpan(operationName, opentracing.ChildOf(parentSpan.Context())) + ext.SpanKindRPCClient.Set(span) + ext.HTTPMethod.Set(span, r.Method) + ext.HTTPUrl.Set(span, r.URL.String()) + + span.SetTag("protocol", "HTTP") + span.SetTag(zipkincore.HTTP_PATH, r.URL.Path) + span.SetTag(zipkincore.HTTP_HOST, r.URL.Host) + + carrier = opentracing.HTTPHeadersCarrier(r.Header) + default: + // grpc? + return nil + } + + util.SetContext(ctx, tracing.CTX_TRACE_SPAN, span) + + if err := ZipkinTracer().Inject( + span.Context(), + opentracing.HTTPHeaders, + carrier, + ); err != nil { + util.Logger().Errorf(err, "tracer inject request failed") + } + + return span +} + +func (zp *Zipkin) ClientEnd(itf tracing.Span, code int, message string) { + span, ok := itf.(opentracing.Span) + if !ok { + return + } + setResultTags(span, code, message) + span.Finish() +} + +func setResultTags(span opentracing.Span, code int, message string) { + if code >= http.StatusBadRequest && len(message) > 0 { + span.SetTag("error", message) + } + ext.HTTPStatusCode.Set(span, uint16(code)) +} diff --git a/server/plugin/infra/tracing/buildin/common.go b/server/plugin/infra/tracing/buildin/common.go new file mode 100644 index 00000000..16535373 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/common.go @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package buildin + +import ( + "fmt" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/apache/incubator-servicecomb-service-center/server/core" + "github.com/opentracing/opentracing-go" + zipkin "github.com/openzipkin/zipkin-go-opentracing" + "os" + "path/filepath" + "strings" +) + +func initTracer() { + collector, err := newCollector() + if err != nil { + util.Logger().Errorf(err, "new tracing collector failed, use the noop tracer") + return + } + ipPort, _ := util.ParseEndpoint(core.Instance.Endpoints[0]) + recorder := zipkin.NewRecorder(collector, false, ipPort, strings.ToLower(core.Service.ServiceName)) + tracer, err := zipkin.NewTracer(recorder, zipkin.TraceID128Bit(true)) + if err != nil { + util.Logger().Errorf(err, "new tracer failed") + return + } + opentracing.SetGlobalTracer(tracer) +} + +func newCollector() (collector zipkin.Collector, err error) { + ct := strings.TrimSpace(os.Getenv("TRACING_COLLECTOR")) + switch ct { + case "server": + sa := GetServerEndpoint() + collector, err = zipkin.NewHTTPCollector(sa + "/api/v1/spans") + if err != nil { + return + } + case "file": + fp := GetFilePath(core.Service.ServiceName + ".trace") + collector, err = NewFileCollector(fp) + if err != nil { + return + } + default: + err = fmt.Errorf("unknown tracing collector type '%s'", ct) + } + return +} + +func ZipkinTracer() opentracing.Tracer { + once.Do(initTracer) + return opentracing.GlobalTracer() +} + +func GetFilePath(defName string) string { + path := os.Getenv("TRACING_FILE_PATH") + if len(path) == 0 { + wd, _ := os.Getwd() + return filepath.Join(wd, defName) + } + return path +} + +func GetServerEndpoint() string { + sa := os.Getenv("TRACING_SERVER_ADDRESS") + if len(sa) == 0 { + sa = "http://127.0.0.1:9411" + } + return sa +} diff --git a/server/plugin/infra/tracing/buildin/file_collector.go b/server/plugin/infra/tracing/buildin/file_collector.go new file mode 100644 index 00000000..636e7aa7 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/file_collector.go @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package buildin + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/apache/incubator-servicecomb-service-center/pkg/logrotate" + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/apache/incubator-servicecomb-service-center/server/core" + "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "os" + "time" +) + +type FileCollector struct { + Fd *os.File + Interval time.Duration + QueueSize int + c chan *zipkincore.Span +} + +func (f *FileCollector) Collect(span *zipkincore.Span) error { + if f.Fd == nil { + return fmt.Errorf("required FD to write") + } + + f.c <- span + return nil +} + +func (f *FileCollector) Close() error { + return f.Fd.Close() +} + +func (f *FileCollector) write(batch []*zipkincore.Span) (c int) { + if len(batch) == 0 { + return + } + newLine := [...]byte{'\n'} + w := bufio.NewWriter(f.Fd) + for _, span := range batch { + s := FromZipkinSpan(span) + b, err := json.Marshal(s) + if err != nil { + util.Logger().Errorf(err, "marshal span failed") + continue + } + w.Write(b) + w.Write(newLine[:]) + c++ + } + if err := w.Flush(); err != nil { + util.Logger().Errorf(err, "write span to file failed") + } + return +} + +func (f *FileCollector) loop(stopCh <-chan struct{}) { + var ( + batch []*zipkincore.Span + prev []*zipkincore.Span + i = f.Interval * 10 + t = time.NewTicker(f.Interval) + nr = time.Now().Add(i) + ) + for { + select { + case <-stopCh: + f.write(batch) + return + case span := <-f.c: + batch = append(batch, span) + if len(batch) >= f.QueueSize { + if c := f.write(batch); c == 0 { + continue + } + if prev != nil { + batch, prev = prev[:0], batch + } else { + prev, batch = batch, batch[len(batch):] // new one + } + } + case <-t.C: + if time.Now().After(nr) { + traceutils.LogRotateFile(f.Fd.Name(), + int(core.ServerInfo.Config.LogRotateSize), + int(core.ServerInfo.Config.LogBackupCount), + ) + nr = time.Now().Add(i) + } + + if c := f.write(batch); c > 0 { + batch = batch[:0] + } + } + } +} + +func NewFileCollector(path string) (*FileCollector, error) { + fd, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600) + if err != nil { + return nil, err + } + fc := &FileCollector{ + Fd: fd, + Interval: 10 * time.Second, + QueueSize: 100, + c: make(chan *zipkincore.Span, 1000), + } + util.Go(fc.loop) + return fc, nil +} diff --git a/server/plugin/infra/tracing/buildin/span.go b/server/plugin/infra/tracing/buildin/span.go new file mode 100644 index 00000000..32c09a78 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/span.go @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package buildin + +import ( + "github.com/apache/incubator-servicecomb-service-center/pkg/util" + "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "github.com/openzipkin/zipkin-go-opentracing/types" + "strconv" +) + +type Span struct { + TraceID string `thrift:"trace_id,1" db:"trace_id" json:"traceId"` + // unused field # 2 + Name string `thrift:"name,3" db:"name" json:"name"` + ID string `thrift:"id,4" db:"id" json:"id"` + ParentID string `thrift:"parent_id,5" db:"parent_id" json:"parentId,omitempty"` + Annotations []*Annotation `thrift:"annotations,6" db:"annotations" json:"annotations"` + // unused field # 7 + BinaryAnnotations []*BinaryAnnotation `thrift:"binary_annotations,8" db:"binary_annotations" json:"binaryAnnotations"` + //Debug bool `thrift:"debug,9" db:"debug" json:"debug,omitempty"` + Timestamp *int64 `thrift:"timestamp,10" db:"timestamp" json:"timestamp,omitempty"` + Duration *int64 `thrift:"duration,11" db:"duration" json:"duration,omitempty"` + //TraceIDHigh *int64 `thrift:"trace_id_high,12" db:"trace_id_high" json:"trace_id_high,omitempty"` +} + +type Annotation struct { + Timestamp int64 `thrift:"timestamp,1" db:"timestamp" json:"timestamp"` + Value string `thrift:"value,2" db:"value" json:"value"` + Host *Endpoint `thrift:"host,3" db:"host" json:"host,omitempty"` +} + +type BinaryAnnotation struct { + Key string `thrift:"key,1" db:"key" json:"key"` + Value string `thrift:"value,2" db:"value" json:"value"` + //AnnotationType AnnotationType `thrift:"annotation_type,3" db:"annotation_type" json:"annotation_type"` + //Host *Endpoint `thrift:"host,4" db:"host" json:"host,omitempty"` +} + +type Endpoint struct { + Ipv4 string `thrift:"ipv4,1" db:"ipv4" json:"ipv4"` + Port int16 `thrift:"port,2" db:"port" json:"port"` + ServiceName string `thrift:"service_name,3" db:"service_name" json:"serviceName"` + Ipv6 []byte `thrift:"ipv6,4" db:"ipv6" json:"ipv6,omitempty"` +} + +func (s *Span) FromZipkinSpan(span *zipkincore.Span) { + traceId := new(types.TraceID) + traceId.Low = uint64(span.TraceID) + traceId.High = uint64(*(span.TraceIDHigh)) + s.TraceID = traceId.ToHex() + s.Duration = span.Duration + + s.ID = strconv.FormatUint(uint64(span.ID), 16) + if span.ParentID != nil { + s.ParentID = strconv.FormatUint(uint64(*(span.ParentID)), 16) + } + + s.Name = span.Name + s.Timestamp = span.Timestamp + + for _, a := range span.Annotations { + s.Annotations = append(s.Annotations, &Annotation{ + Timestamp: a.Timestamp, + Value: a.Value, + Host: &Endpoint{ + Ipv4: util.InetNtoa(uint32(a.Host.Ipv4)), + Port: a.Host.Port, + ServiceName: a.Host.ServiceName, + Ipv6: a.Host.Ipv6, + }, + }) + } + + for _, ba := range span.BinaryAnnotations { + if zipkincore.SERVER_ADDR == ba.Key { + continue + } + s.BinaryAnnotations = append(s.BinaryAnnotations, &BinaryAnnotation{ + Key: ba.Key, + Value: string(ba.Value), + }) + } +} + +func FromZipkinSpan(span *zipkincore.Span) (s Span) { + s.FromZipkinSpan(span) + return +} diff --git a/server/plugin/infra/tracing/buildin/span_test.go b/server/plugin/infra/tracing/buildin/span_test.go new file mode 100644 index 00000000..c3dceb49 --- /dev/null +++ b/server/plugin/infra/tracing/buildin/span_test.go @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package buildin + +import ( + "encoding/json" + "fmt" + "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore" + "testing" +) + +var sample = []byte(`{ + "trace_id": 4081433150731846551, + "name": "api", + "id": 8350480249446290292, + "annotations": [ + { + "timestamp": 1517455386250894, + "value": "sr", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "timestamp": 1517455386251872, + "value": "ss", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + } + ], + "binary_annotations": [ + { + "key": "span.kind", + "value": "c2VydmVy", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "protocol", + "value": "SFRUUA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "http.method", + "value": "R0VU", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "http.url", + "value": "L3Y0L2RlZmF1bHQvcmVnaXN0cnkvbWljcm9zZXJ2aWNlcz8lM0Fwcm9qZWN0PWRlZmF1bHQm", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "http.path", + "value": "L3Y0L2RlZmF1bHQvcmVnaXN0cnkvbWljcm9zZXJ2aWNlcw==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "http.host", + "value": "", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "resultCode", + "value": "MjAw", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "result", + "value": "MA==", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + }, + { + "key": "http.status_code", + "value": "MjAw", + "annotation_type": "STRING", + "host": { + "ipv4": 2130706433, + "port": 30100, + "service_name": "SERVICECENTER" + } + } + ], + "timestamp": 1517455386250894, + "duration": 978, + "trace_id_high": 7511721612091346172 +}`) + +func TestFromZipkinSpan(t *testing.T) { + span := &zipkincore.Span{} + err := json.Unmarshal(sample, &span) + if err != nil { + fmt.Println("TestFromZipkinSpan Unmarshal", err) + t.FailNow() + } + s := FromZipkinSpan(span) + b, err := json.Marshal(s) + if err != nil { + fmt.Println("TestFromZipkinSpan Marshal", err) + t.FailNow() + } + fmt.Println(string(b)) +} diff --git a/server/plugin/plugin.go b/server/plugin/plugin.go index 02ffa064..308a7132 100644 --- a/server/plugin/plugin.go +++ b/server/plugin/plugin.go @@ -25,6 +25,7 @@ import ( "github.com/apache/incubator-servicecomb-service-center/server/infra/quota" "github.com/apache/incubator-servicecomb-service-center/server/infra/registry" "github.com/apache/incubator-servicecomb-service-center/server/infra/security" + "github.com/apache/incubator-servicecomb-service-center/server/infra/tracing" "github.com/apache/incubator-servicecomb-service-center/server/infra/uuid" "github.com/astaxie/beego" pg "plugin" @@ -40,6 +41,7 @@ const ( CIPHER QUOTA REGISTRY + TRACING typeEnd ) @@ -50,6 +52,7 @@ var pluginNames = map[PluginName]string{ CIPHER: "cipher", QUOTA: "quota", REGISTRY: "registry", + TRACING: "trace", } var pluginMgr = &PluginManager{} @@ -216,6 +219,10 @@ func (pm *PluginManager) Quota() quota.QuotaManager { return pm.Instance(QUOTA).(quota.QuotaManager) } +func (pm *PluginManager) Tracing() tracing.Tracing { + return pm.Instance(TRACING).(tracing.Tracing) +} + func Plugins() *PluginManager { return pluginMgr } diff --git a/server/service/service_suite_test.go b/server/service/service_suite_test.go index d9c0cb9e..e64a8c4a 100644 --- a/server/service/service_suite_test.go +++ b/server/service/service_suite_test.go @@ -21,6 +21,7 @@ import ( pb "github.com/apache/incubator-servicecomb-service-center/server/core/proto" _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin" _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd" + _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin" _ "github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin" "github.com/apache/incubator-servicecomb-service-center/server/service" . "github.com/onsi/ginkgo" ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services