This is an automated email from the ASF dual-hosted git repository.
littlecui pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-servicecomb-service-center.git
The following commit(s) were added to refs/heads/master by this push:
new d6b0b3a SCB-321 Support report trace data (#267)
d6b0b3a is described below
commit d6b0b3ac55ae0ad3cd35525c7cf6a42ff9ad54ad
Author: little-cui <[email protected]>
AuthorDate: Fri Feb 2 18:01:10 2018 +0800
SCB-321 Support report trace data (#267)
* SCB-321 Add tracing handler
* SCB-321 Support report trace data
* SCB-321 Support report trace data
* SCB-321 Add README.md
* SCB-321 Fix UT failure.
* SCB-321 Optimize trace rotate.
* SCB-321 Optimize trace rotate.
* SCB-321 Optimize trace rotate.
* SCB-321 Optimize zipkin plugin.
* SCB-321 Optimize zipkin plugin.
* SCB-321 Optimize zipkin plugin.
* SCB-321 Optimize zipkin plugin.
---
docs/tracing-file.PNG | Bin 0 -> 32339 bytes
docs/tracing-server.PNG | Bin 0 -> 32308 bytes
etc/conf/app.conf | 10 +-
pkg/chain/callback.go | 18 +-
pkg/chain/chain.go | 8 +-
pkg/chain/chain_test.go | 48 +++--
pkg/chain/invocation.go | 50 ++++-
pkg/rest/common.go | 1 +
pkg/rest/route.go | 14 +-
pkg/util/context.go | 164 +++++++++++++++++
pkg/util/net.go | 111 ++++++++++++
pkg/util/{sys.go => net_test.go} | 58 +++---
pkg/util/sys.go | 23 +--
pkg/util/util.go | 201 ---------------------
server/bootstrap/bootstrap.go | 5 +
server/handler/metric/metric.go | 7 +-
server/handler/tracing/tracing.go | 53 ++----
.../common.go => server/infra/tracing/tracing.go | 31 +---
server/plugin/README.md | 1 +
server/plugin/infra/registry/etcd/etcd.go | 35 +++-
server/plugin/infra/registry/etcd/tracing.go | 53 ++++++
server/plugin/infra/tracing/buildin/README.md | 34 ++++
server/plugin/infra/tracing/buildin/buildin.go | 146 +++++++++++++++
server/plugin/infra/tracing/buildin/common.go | 87 +++++++++
.../plugin/infra/tracing/buildin/file_collector.go | 128 +++++++++++++
server/plugin/infra/tracing/buildin/span.go | 103 +++++++++++
server/plugin/infra/tracing/buildin/span_test.go | 161 +++++++++++++++++
server/plugin/plugin.go | 7 +
server/service/service_suite_test.go | 1 +
29 files changed, 1198 insertions(+), 360 deletions(-)
diff --git a/docs/tracing-file.PNG b/docs/tracing-file.PNG
new file mode 100644
index 0000000..e6ee116
Binary files /dev/null and b/docs/tracing-file.PNG differ
diff --git a/docs/tracing-server.PNG b/docs/tracing-server.PNG
new file mode 100644
index 0000000..70b7c0c
Binary files /dev/null and b/docs/tracing-server.PNG differ
diff --git a/etc/conf/app.conf b/etc/conf/app.conf
index a48f08c..a4fb8dc 100644
--- a/etc/conf/app.conf
+++ b/etc/conf/app.conf
@@ -47,12 +47,20 @@ cipher_plugin = ""
#suppot buildin, unlimit
quota_plugin = ""
-# access control plugin
+#access control plugin
auth_plugin = ""
#support om, manage
auditlog_plugin = ""
+#tracing: buildin(zipkin)
+# buildin(zipkin): Can export TRACING_COLLECTOR env variable to select
+# collector type, 'server' means report trace data
+# to zipkin server address specified by
TRACING_SERVER_ADDRESS
+# env variable; 'file' means just output a file stored
+# in path specified by TRACING_FILE_PATH env variable
+trace_plugin = ""
+
###################################################################
# rate limit options
###################################################################
diff --git a/pkg/chain/callback.go b/pkg/chain/callback.go
index 69be2ce..9b2e7fa 100644
--- a/pkg/chain/callback.go
+++ b/pkg/chain/callback.go
@@ -17,7 +17,6 @@
package chain
import (
- "fmt"
"github.com/apache/incubator-servicecomb-service-center/pkg/util"
)
@@ -31,28 +30,33 @@ func (r Result) String() string {
if r.OK {
return "OK"
}
- return fmt.Sprintf("FAIL(error: %s)", r.Err)
+ return r.Err.Error()
}
type Callback struct {
- Func func(r Result)
+ Func func(r Result)
+ Async bool
}
func (cb *Callback) Invoke(r Result) {
- go cb.syncInvoke(r)
+ if cb.Async {
+ go syncInvoke(cb.Func, r)
+ return
+ }
+ syncInvoke(cb.Func, r)
}
-func (cb *Callback) syncInvoke(r Result) {
+func syncInvoke(f func(r Result), r Result) {
defer func() {
if itf := recover(); itf != nil {
util.LogPanic(itf)
}
}()
- if cb.Func == nil {
+ if f == nil {
util.Logger().Errorf(nil, "Callback function is nil. result:
%s,", r)
return
}
- cb.Func(r)
+ f(r)
}
func (cb *Callback) Fail(err error, args ...interface{}) {
diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go
index a48fc1c..8cfb4dd 100644
--- a/pkg/chain/chain.go
+++ b/pkg/chain/chain.go
@@ -30,10 +30,7 @@ type Chain struct {
func (c *Chain) Init(chainName string, hs []Handler) {
c.name = chainName
c.currentIndex = -1
- if len(hs) > 0 {
- c.handlers = make([]Handler, len(hs))
- copy(c.handlers, hs)
- }
+ c.handlers = hs
}
func (c *Chain) Name() string {
@@ -63,8 +60,7 @@ func (c *Chain) Next(i *Invocation) {
c.syncNext(i)
}
-func NewChain(name string, handlers []Handler) Chain {
- var ch Chain
+func NewChain(name string, handlers []Handler) (ch Chain) {
ch.Init(name, handlers)
return ch
}
diff --git a/pkg/chain/chain_test.go b/pkg/chain/chain_test.go
index 8d67fa8..3e5b8c0 100644
--- a/pkg/chain/chain_test.go
+++ b/pkg/chain/chain_test.go
@@ -22,53 +22,51 @@ import (
"testing"
)
+const (
+ times = 1000000
+ count = 100
+)
+
+func init() {
+ for i := 0; i < count; i++ {
+ chain.RegisterHandler("_bench_handlers_", &handler{})
+ }
+}
+
type handler struct {
}
func (h *handler) Handle(i *chain.Invocation) {
- counter := i.Context().Value("counter").(int)
- counter++
- i.WithContext("counter", counter)
i.Next()
}
-func syncFunc(ctx map[string]interface{}) {
- counter := ctx["counter"].(int)
- counter++
- ctx["counter"] = counter
+func syncFunc() {
+ for i := 0; i < count; i++ {
+ }
}
func BenchmarkChain(b *testing.B) {
- b.N = 100
- if len(chain.Handlers("_bench_handlers_")) == 0 {
- for i := 0; i < b.N; i++ {
- chain.RegisterHandler("_bench_handlers_", &handler{})
- }
- }
+ var (
+ ctx = context.Background()
+ f = func(r chain.Result) {}
+ )
+ b.N = times
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- ch := make(chan struct{})
- inv := chain.NewInvocation(context.Background(),
- chain.NewChain("_bench_chain_",
chain.Handlers("_bench_handlers_")))
- inv.WithContext("counter", 0)
- inv.Invoke(func(r chain.Result) { close(ch) })
- <-ch
+ inv := chain.NewInvocation(ctx,
chain.NewChain("_bench_chain_", chain.Handlers("_bench_handlers_")))
+ inv.Next(chain.WithFunc(f))
}
})
b.ReportAllocs()
}
func BenchmarkSync(b *testing.B) {
- b.N = 100
+ b.N = times
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
- ctx := make(map[string]interface{}, 1)
- ctx["counter"] = 0
- for i := 0; i < b.N; i++ {
- syncFunc(ctx)
- }
+ syncFunc()
}
})
b.ReportAllocs()
diff --git a/pkg/chain/invocation.go b/pkg/chain/invocation.go
index 173cb7f..0ab0df7 100644
--- a/pkg/chain/invocation.go
+++ b/pkg/chain/invocation.go
@@ -21,6 +21,20 @@ import (
"golang.org/x/net/context"
)
+type InvocationOption func(op InvocationOp) InvocationOp
+
+type InvocationOp struct {
+ Func func(r Result)
+ Async bool
+}
+
+func WithFunc(f func(r Result)) InvocationOption {
+ return func(op InvocationOp) InvocationOp { op.Func = f; return op }
+}
+func WithAsyncFunc(f func(r Result)) InvocationOption {
+ return func(op InvocationOp) InvocationOp { op.Func = f; op.Async =
true; return op }
+}
+
type Invocation struct {
Callback
context *util.StringContext
@@ -41,17 +55,45 @@ func (i *Invocation) WithContext(key string, val
interface{}) *Invocation {
return i
}
-func (i *Invocation) Next() {
+func (i *Invocation) Next(opts ...InvocationOption) {
+ var op InvocationOp
+ for _, opt := range opts {
+ op = opt(op)
+ }
+
+ i.setCallback(op.Func, op.Async)
i.chain.Next(i)
}
+func (i *Invocation) setCallback(f func(r Result), async bool) {
+ if f == nil {
+ return
+ }
+
+ if i.Func == nil {
+ i.Func = f
+ i.Async = async
+ return
+ }
+
+ cb := i.Func
+ i.Func = func(r Result) {
+ cb(r)
+ callback(f, async, r)
+ }
+}
+
+func callback(f func(r Result), async bool, r Result) {
+ c := Callback{Func: f, Async: async}
+ c.Invoke(r)
+}
+
func (i *Invocation) Invoke(f func(r Result)) {
i.Func = f
- i.Next()
+ i.chain.Next(i)
}
-func NewInvocation(ctx context.Context, ch Chain) Invocation {
- var inv Invocation
+func NewInvocation(ctx context.Context, ch Chain) (inv Invocation) {
inv.Init(ctx, ch)
return inv
}
diff --git a/pkg/rest/common.go b/pkg/rest/common.go
index 4e135fb..1010494 100644
--- a/pkg/rest/common.go
+++ b/pkg/rest/common.go
@@ -29,6 +29,7 @@ const (
CTX_RESPONSE = "_server_response"
CTX_REQUEST = "_server_request"
CTX_MATCH_PATTERN = "_server_match_pattern"
+ CTX_MATCH_FUNC = "_server_match_func"
SERVER_CHAIN_NAME = "_server_chain"
)
diff --git a/pkg/rest/route.go b/pkg/rest/route.go
index 3015aff..e3eb7c5 100644
--- a/pkg/rest/route.go
+++ b/pkg/rest/route.go
@@ -33,6 +33,7 @@ type URLPattern struct {
}
type urlPatternHandler struct {
+ Name string
Path string
http.Handler
}
@@ -68,7 +69,8 @@ func (this *ROAServerHandler) addRoute(route *Route) (err
error) {
return errors.New(message)
}
- this.handlers[method] = append(this.handlers[method],
&urlPatternHandler{route.Path, http.HandlerFunc(route.Func)})
+ this.handlers[method] = append(this.handlers[method],
&urlPatternHandler{
+ util.FormatFuncName(util.FuncName(route.Func)), route.Path,
http.HandlerFunc(route.Func)})
util.Logger().Infof("register route %s(%s).", route.Path, method)
return nil
@@ -121,14 +123,13 @@ func (this *ROAServerHandler) serve(ph
*urlPatternHandler, w http.ResponseWriter
*r = *nr
}
- ch := make(chan struct{})
inv := chain.NewInvocation(ctx, chain.NewChain(SERVER_CHAIN_NAME, hs))
inv.WithContext(CTX_RESPONSE, w).
WithContext(CTX_REQUEST, r).
- WithContext(CTX_MATCH_PATTERN, ph.Path)
- inv.Invoke(func(ret chain.Result) {
+ WithContext(CTX_MATCH_PATTERN, ph.Path).
+ WithContext(CTX_MATCH_FUNC, ph.Name)
+ inv.Next(chain.WithFunc(func(ret chain.Result) {
defer func() {
- defer close(ch)
err := ret.Err
itf := recover()
if itf != nil {
@@ -148,8 +149,7 @@ func (this *ROAServerHandler) serve(ph *urlPatternHandler,
w http.ResponseWriter
if ret.OK {
ph.ServeHTTP(w, r)
}
- })
- <-ch
+ }))
}
func (this *urlPatternHandler) try(path string) (p string, _ bool) {
diff --git a/pkg/util/context.go b/pkg/util/context.go
new file mode 100644
index 0000000..41b41e4
--- /dev/null
+++ b/pkg/util/context.go
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "golang.org/x/net/context"
+ "net/http"
+ "time"
+)
+
+type StringContext struct {
+ parentCtx context.Context
+ kv map[string]interface{}
+}
+
+func (c *StringContext) Deadline() (deadline time.Time, ok bool) {
+ return c.parentCtx.Deadline()
+}
+
+func (c *StringContext) Done() <-chan struct{} {
+ return c.parentCtx.Done()
+}
+
+func (c *StringContext) Err() error {
+ return c.parentCtx.Err()
+}
+
+func (c *StringContext) Value(key interface{}) interface{} {
+ k, ok := key.(string)
+ if !ok {
+ return c.parentCtx.Value(key)
+ }
+ return c.kv[k]
+}
+
+func (c *StringContext) SetKV(key string, val interface{}) {
+ c.kv[key] = val
+}
+
+func NewStringContext(ctx context.Context) *StringContext {
+ strCtx, ok := ctx.(*StringContext)
+ if !ok {
+ strCtx = &StringContext{
+ parentCtx: ctx,
+ kv: make(map[string]interface{}, 10),
+ }
+ }
+ return strCtx
+}
+
+func SetContext(ctx context.Context, key string, val interface{})
context.Context {
+ strCtx := NewStringContext(ctx)
+ strCtx.SetKV(key, val)
+ return strCtx
+}
+
+func CloneContext(ctx context.Context) context.Context {
+ strCtx := &StringContext{
+ parentCtx: ctx,
+ kv: make(map[string]interface{}, 10),
+ }
+
+ old, ok := ctx.(*StringContext)
+ if !ok {
+ return strCtx
+ }
+
+ for k, v := range old.kv {
+ strCtx.kv[k] = v
+ }
+ return strCtx
+}
+
+func FromContext(ctx context.Context, key string) interface{} {
+ return ctx.Value(key)
+}
+
+func SetRequestContext(r *http.Request, key string, val interface{})
*http.Request {
+ ctx := r.Context()
+ ctx = SetContext(ctx, key, val)
+ if ctx != r.Context() {
+ nr := r.WithContext(ctx)
+ *r = *nr
+ }
+ return r
+}
+
+func ParseDomainProject(ctx context.Context) string {
+ return ParseDomain(ctx) + "/" + ParseProject(ctx)
+}
+
+func ParseTargetDomainProject(ctx context.Context) string {
+ return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
+}
+
+func ParseDomain(ctx context.Context) string {
+ v, ok := FromContext(ctx, "domain").(string)
+ if !ok {
+ return ""
+ }
+ return v
+}
+
+func ParseTargetDomain(ctx context.Context) string {
+ v, _ := FromContext(ctx, "target-domain").(string)
+ if len(v) == 0 {
+ return ParseDomain(ctx)
+ }
+ return v
+}
+
+func ParseProject(ctx context.Context) string {
+ v, ok := FromContext(ctx, "project").(string)
+ if !ok {
+ return ""
+ }
+ return v
+}
+
+func ParseTargetProject(ctx context.Context) string {
+ v, _ := FromContext(ctx, "target-project").(string)
+ if len(v) == 0 {
+ return ParseProject(ctx)
+ }
+ return v
+}
+
+func SetDomain(ctx context.Context, domain string) context.Context {
+ return SetContext(ctx, "domain", domain)
+}
+
+func SetProject(ctx context.Context, project string) context.Context {
+ return SetContext(ctx, "project", project)
+}
+
+func SetTargetDomain(ctx context.Context, domain string) context.Context {
+ return SetContext(ctx, "target-domain", domain)
+}
+
+func SetTargetProject(ctx context.Context, project string) context.Context {
+ return SetContext(ctx, "target-project", project)
+}
+
+func SetDomainProject(ctx context.Context, domain string, project string)
context.Context {
+ return SetProject(SetDomain(ctx, domain), project)
+}
+
+func SetTargetDomainProject(ctx context.Context, domain string, project
string) context.Context {
+ return SetTargetProject(SetTargetDomain(ctx, domain), project)
+}
diff --git a/pkg/util/net.go b/pkg/util/net.go
new file mode 100644
index 0000000..d73e928
--- /dev/null
+++ b/pkg/util/net.go
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "golang.org/x/net/context"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+)
+
+func GetIPFromContext(ctx context.Context) string {
+ v, ok := FromContext(ctx, "x-remote-ip").(string)
+ if !ok {
+ return ""
+ }
+ return v
+}
+
+func UrlEncode(keys map[string]string) string {
+ l := len(keys)
+ if l == 0 {
+ return ""
+ }
+ arr := make([]string, 0, l)
+ for k, v := range keys {
+ arr = append(arr, url.QueryEscape(k)+"="+url.QueryEscape(v))
+ }
+ return StringJoin(arr, "&")
+}
+
+func ParseEndpoint(ep string) (string, error) {
+ u, err := url.Parse(ep)
+ if err != nil {
+ return "", err
+ }
+ port := u.Port()
+ if len(port) > 0 {
+ return u.Hostname() + ":" + port, nil
+ }
+ return u.Hostname(), nil
+}
+
+func GetRealIP(r *http.Request) string {
+ for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} {
+ addresses := strings.Split(r.Header.Get(h), ",")
+ for _, ip := range addresses {
+ ip = strings.TrimSpace(ip)
+ realIP := net.ParseIP(ip)
+ if !realIP.IsGlobalUnicast() {
+ continue
+ }
+ return ip
+ }
+ }
+ addrs := strings.Split(r.RemoteAddr, ":")
+ if len(addrs) > 0 {
+ return addrs[0]
+ }
+ return ""
+}
+
+func GetLocalIP() string {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return ""
+ }
+ for _, address := range addrs {
+ // check the address type and if it is not a loopback the
display it
+ if ipnet, ok := address.(*net.IPNet); ok &&
!ipnet.IP.IsLoopback() {
+ if ipnet.IP.To4() != nil {
+ return ipnet.IP.String()
+ }
+ }
+ }
+ return ""
+}
+
+func InetNtoIP(ipnr uint32) net.IP {
+ return net.IPv4(byte(ipnr>>24), byte(ipnr>>16), byte(ipnr>>8),
byte(ipnr))
+}
+
+func InetNtoa(ipnr uint32) string {
+ return InetNtoIP(ipnr).String()
+}
+
+func InetAton(ip string) (ipnr uint32) {
+ bytes := net.ParseIP(ip).To4()
+ for i := 0; i < len(bytes); i++ {
+ ipnr |= uint32(bytes[i])
+ if i < 3 {
+ ipnr <<= 8
+ }
+ }
+ return
+}
diff --git a/pkg/util/sys.go b/pkg/util/net_test.go
similarity index 51%
copy from pkg/util/sys.go
copy to pkg/util/net_test.go
index dae6651..b61b99d 100644
--- a/pkg/util/sys.go
+++ b/pkg/util/net_test.go
@@ -16,35 +16,43 @@
*/
package util
-import (
- "net"
- "unsafe"
-)
+import "testing"
-const INT_SIZE int = int(unsafe.Sizeof(0))
+const (
+ ip1 = "127.0.0.1" // 2130706433
+ ip2 = "0.0.0.0" // 0
+ ip3 = "255.255.255.255" // 4294967295
+ n1 = 2130706433 // "127.0.0.1"
+ n2 = 0 // "0.0.0.0"
+ n3 = 4294967295 // "255.255.255.255"
+)
-func GetLocalIP() string {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return ""
+func TestInetAton(t *testing.T) {
+ i := InetAton(ip1)
+ if i != 2130706433 {
+ fail(t, "InetAton(%s) error", ip1)
}
- for _, address := range addrs {
- // check the address type and if it is not a loopback the
display it
- if ipnet, ok := address.(*net.IPNet); ok &&
!ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
- }
- }
+ i = InetAton(ip2)
+ if i != 0 {
+ fail(t, "InetAton(%s) error", ip2)
+ }
+ i = InetAton(ip3)
+ if i != 4294967295 {
+ fail(t, "InetAton(%s) error", ip3)
}
- return ""
-}
-
-func IsBigEndian() bool {
- return !IsLittleEndian()
}
-func IsLittleEndian() bool {
- i := 0x1
- bs := (*[INT_SIZE]byte)(unsafe.Pointer(&i))
- return bs[0] == 0
+func TestInetNtoa(t *testing.T) {
+ ip := InetNtoa(n1)
+ if ip != ip1 {
+ fail(t, "InetNtoa(%d) error", n1)
+ }
+ ip = InetNtoa(n2)
+ if ip != ip2 {
+ fail(t, "InetNtoa(%d) error", n2)
+ }
+ ip = InetNtoa(n3)
+ if ip != ip3 {
+ fail(t, "InetNtoa(%d) error", n3)
+ }
}
diff --git a/pkg/util/sys.go b/pkg/util/sys.go
index dae6651..8a9a23d 100644
--- a/pkg/util/sys.go
+++ b/pkg/util/sys.go
@@ -17,28 +17,12 @@
package util
import (
- "net"
+ "os"
"unsafe"
)
const INT_SIZE int = int(unsafe.Sizeof(0))
-func GetLocalIP() string {
- addrs, err := net.InterfaceAddrs()
- if err != nil {
- return ""
- }
- for _, address := range addrs {
- // check the address type and if it is not a loopback the
display it
- if ipnet, ok := address.(*net.IPNet); ok &&
!ipnet.IP.IsLoopback() {
- if ipnet.IP.To4() != nil {
- return ipnet.IP.String()
- }
- }
- }
- return ""
-}
-
func IsBigEndian() bool {
return !IsLittleEndian()
}
@@ -48,3 +32,8 @@ func IsLittleEndian() bool {
bs := (*[INT_SIZE]byte)(unsafe.Pointer(&i))
return bs[0] == 0
}
+
+func PathExist(path string) bool {
+ _, err := os.Stat(path)
+ return err == nil || os.IsExist(err)
+}
diff --git a/pkg/util/util.go b/pkg/util/util.go
index 17997a6..554af47 100644
--- a/pkg/util/util.go
+++ b/pkg/util/util.go
@@ -20,10 +20,6 @@ import (
"bytes"
"encoding/gob"
"fmt"
- "golang.org/x/net/context"
- "net"
- "net/http"
- "net/url"
"os"
"reflect"
"runtime"
@@ -33,11 +29,6 @@ import (
"unsafe"
)
-func PathExist(path string) bool {
- _, err := os.Stat(path)
- return err == nil || os.IsExist(err)
-}
-
func MinInt(x, y int) int {
if x <= y {
return x
@@ -68,155 +59,6 @@ func ClearByteMemory(src []byte) {
}
}
-type StringContext struct {
- parentCtx context.Context
- kv map[string]interface{}
-}
-
-func (c *StringContext) Deadline() (deadline time.Time, ok bool) {
- return c.parentCtx.Deadline()
-}
-
-func (c *StringContext) Done() <-chan struct{} {
- return c.parentCtx.Done()
-}
-
-func (c *StringContext) Err() error {
- return c.parentCtx.Err()
-}
-
-func (c *StringContext) Value(key interface{}) interface{} {
- k, ok := key.(string)
- if !ok {
- return c.parentCtx.Value(key)
- }
- return c.kv[k]
-}
-
-func (c *StringContext) SetKV(key string, val interface{}) {
- c.kv[key] = val
-}
-
-func NewStringContext(ctx context.Context) *StringContext {
- strCtx, ok := ctx.(*StringContext)
- if !ok {
- strCtx = &StringContext{
- parentCtx: ctx,
- kv: make(map[string]interface{}, 10),
- }
- }
- return strCtx
-}
-
-func SetContext(ctx context.Context, key string, val interface{})
context.Context {
- strCtx := NewStringContext(ctx)
- strCtx.SetKV(key, val)
- return strCtx
-}
-
-func CloneContext(ctx context.Context) context.Context {
- strCtx := &StringContext{
- parentCtx: ctx,
- kv: make(map[string]interface{}, 10),
- }
-
- old, ok := ctx.(*StringContext)
- if !ok {
- return strCtx
- }
-
- for k, v := range old.kv {
- strCtx.kv[k] = v
- }
- return strCtx
-}
-
-func FromContext(ctx context.Context, key string) interface{} {
- return ctx.Value(key)
-}
-
-func SetRequestContext(r *http.Request, key string, val interface{})
*http.Request {
- ctx := r.Context()
- ctx = SetContext(ctx, key, val)
- if ctx != r.Context() {
- nr := r.WithContext(ctx)
- *r = *nr
- }
- return r
-}
-
-func ParseDomainProject(ctx context.Context) string {
- return ParseDomain(ctx) + "/" + ParseProject(ctx)
-}
-
-func ParseTargetDomainProject(ctx context.Context) string {
- return ParseTargetDomain(ctx) + "/" + ParseTargetProject(ctx)
-}
-
-func ParseDomain(ctx context.Context) string {
- v, ok := FromContext(ctx, "domain").(string)
- if !ok {
- return ""
- }
- return v
-}
-
-func ParseTargetDomain(ctx context.Context) string {
- v, _ := FromContext(ctx, "target-domain").(string)
- if len(v) == 0 {
- return ParseDomain(ctx)
- }
- return v
-}
-
-func ParseProject(ctx context.Context) string {
- v, ok := FromContext(ctx, "project").(string)
- if !ok {
- return ""
- }
- return v
-}
-
-func ParseTargetProject(ctx context.Context) string {
- v, _ := FromContext(ctx, "target-project").(string)
- if len(v) == 0 {
- return ParseProject(ctx)
- }
- return v
-}
-
-func SetDomain(ctx context.Context, domain string) context.Context {
- return SetContext(ctx, "domain", domain)
-}
-
-func SetProject(ctx context.Context, project string) context.Context {
- return SetContext(ctx, "project", project)
-}
-
-func SetTargetDomain(ctx context.Context, domain string) context.Context {
- return SetContext(ctx, "target-domain", domain)
-}
-
-func SetTargetProject(ctx context.Context, project string) context.Context {
- return SetContext(ctx, "target-project", project)
-}
-
-func SetDomainProject(ctx context.Context, domain string, project string)
context.Context {
- return SetProject(SetDomain(ctx, domain), project)
-}
-
-func SetTargetDomainProject(ctx context.Context, domain string, project
string) context.Context {
- return SetTargetProject(SetTargetDomain(ctx, domain), project)
-}
-
-func GetIPFromContext(ctx context.Context) string {
- v, ok := FromContext(ctx, "x-remote-ip").(string)
- if !ok {
- return ""
- }
- return v
-}
-
func DeepCopy(dst, src interface{}) error {
var buf bytes.Buffer
if err := gob.NewEncoder(&buf).Encode(src); err != nil {
@@ -325,37 +167,6 @@ func GetCaller(skip int) (string, string, int, bool) {
return file, method, line, ok
}
-func ParseEndpoint(ep string) (string, error) {
- u, err := url.Parse(ep)
- if err != nil {
- return "", err
- }
- port := u.Port()
- if len(port) > 0 {
- return u.Hostname() + ":" + port, nil
- }
- return u.Hostname(), nil
-}
-
-func GetRealIP(r *http.Request) string {
- for _, h := range [2]string{"X-Forwarded-For", "X-Real-Ip"} {
- addresses := strings.Split(r.Header.Get(h), ",")
- for _, ip := range addresses {
- ip = strings.TrimSpace(ip)
- realIP := net.ParseIP(ip)
- if !realIP.IsGlobalUnicast() {
- continue
- }
- return ip
- }
- }
- addrs := strings.Split(r.RemoteAddr, ":")
- if len(addrs) > 0 {
- return addrs[0]
- }
- return ""
-}
-
func BytesToInt32(bs []byte) (in int32) {
l := len(bs)
if l > 4 || l == 0 {
@@ -376,18 +187,6 @@ func BytesToInt32(bs []byte) (in int32) {
return
}
-func UrlEncode(keys map[string]string) string {
- l := len(keys)
- if l == 0 {
- return ""
- }
- arr := make([]string, 0, l)
- for k, v := range keys {
- arr = append(arr, url.QueryEscape(k)+"="+url.QueryEscape(v))
- }
- return StringJoin(arr, "&")
-}
-
func FormatFuncName(f string) string {
i := strings.LastIndex(f, "/")
j := strings.Index(f[i+1:], ".")
diff --git a/server/bootstrap/bootstrap.go b/server/bootstrap/bootstrap.go
index 5c50e65..b92c45a 100644
--- a/server/bootstrap/bootstrap.go
+++ b/server/bootstrap/bootstrap.go
@@ -39,6 +39,9 @@ import _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/i
// uuid
import _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin"
+// tracing
+import _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin"
+
// module
import _ "github.com/apache/incubator-servicecomb-service-center/server/govern"
@@ -48,6 +51,7 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/handler/cache"
"github.com/apache/incubator-servicecomb-service-center/server/handler/context"
"github.com/apache/incubator-servicecomb-service-center/server/handler/metric"
+
"github.com/apache/incubator-servicecomb-service-center/server/handler/tracing"
"github.com/apache/incubator-servicecomb-service-center/server/interceptor"
"github.com/apache/incubator-servicecomb-service-center/server/interceptor/access"
"github.com/apache/incubator-servicecomb-service-center/server/interceptor/cors"
@@ -64,6 +68,7 @@ func init() {
// handle requests after routing.
metric.RegisterHandlers()
+ tracing.RegisterHandlers()
auth.RegisterHandlers()
context.RegisterHandlers()
cache.RegisterHandlers()
diff --git a/server/handler/metric/metric.go b/server/handler/metric/metric.go
index 18dcae4..42d2f8a 100644
--- a/server/handler/metric/metric.go
+++ b/server/handler/metric/metric.go
@@ -31,17 +31,14 @@ type MetricsHandler struct {
func (h *MetricsHandler) Handle(i *chain.Invocation) {
w, r := i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter),
i.Context().Value(rest.CTX_REQUEST).(*http.Request)
- cb := i.Func
- i.Invoke(func(ret chain.Result) {
- cb(ret)
-
+ i.Next(chain.WithAsyncFunc(func(ret chain.Result) {
start, ok :=
i.Context().Value(svr.CTX_START_TIMESTAMP).(time.Time)
if !ok {
return
}
svr.ReportRequestCompleted(w, r, start)
util.LogNilOrWarnf(start, "%s %s", r.Method, r.RequestURI)
- })
+ }))
}
func RegisterHandlers() {
diff --git a/server/handler/tracing/tracing.go
b/server/handler/tracing/tracing.go
index caa04db..09776c1 100644
--- a/server/handler/tracing/tracing.go
+++ b/server/handler/tracing/tracing.go
@@ -19,50 +19,29 @@ package tracing
import (
"github.com/apache/incubator-servicecomb-service-center/pkg/chain"
"github.com/apache/incubator-servicecomb-service-center/pkg/rest"
- "github.com/apache/incubator-servicecomb-service-center/server/core"
- "github.com/opentracing/opentracing-go"
- "github.com/opentracing/opentracing-go/ext"
- zipkin "github.com/openzipkin/zipkin-go-opentracing"
- "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "github.com/apache/incubator-servicecomb-service-center/server/plugin"
"net/http"
+ "strconv"
)
-var tracer opentracing.Tracer
-
-func init() {
- collector, err := zipkin.NewHTTPCollector("127.0.0.1:9411")
- if err != nil {
- return
- }
- recorder := zipkin.NewRecorder(collector, false, "0.0.0.0:0",
core.Service.ServiceName)
- tracer, err = zipkin.NewTracer(recorder, zipkin.TraceID128Bit(true))
-}
-
type TracingHandler struct {
}
func (h *TracingHandler) Handle(i *chain.Invocation) {
- w, request :=
i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter),
- i.Context().Value(rest.CTX_REQUEST).(*http.Request)
- ctx, err := tracer.Extract(opentracing.TextMap,
opentracing.HTTPHeadersCarrier(request.Header))
- switch err {
- case nil:
- case opentracing.ErrSpanContextNotFound:
- default:
- }
-
- span := tracer.StartSpan("api", ext.RPCServerOption(ctx))
- ext.SpanKindRPCServer.Set(span)
-
- cb := i.Func
- i.Invoke(func(r chain.Result) {
- cb(r)
- span.SetTag(zipkincore.HTTP_METHOD, request.Method)
- span.SetTag(zipkincore.HTTP_PATH, request.RequestURI)
- span.SetTag(zipkincore.HTTP_STATUS_CODE,
w.Header().Get("X-Response-Status"))
- span.SetTag(zipkincore.HTTP_HOST, request.URL.Host)
- span.Finish()
- })
+ w, r, op := i.Context().Value(rest.CTX_RESPONSE).(http.ResponseWriter),
+ i.Context().Value(rest.CTX_REQUEST).(*http.Request),
+ i.Context().Value(rest.CTX_MATCH_FUNC).(string)
+
+ span := plugin.Plugins().Tracing().ServerBegin(op, r)
+
+ i.Next(chain.WithAsyncFunc(func(ret chain.Result) {
+ statusCode := w.Header().Get("X-Response-Status")
+ code, _ := strconv.ParseInt(statusCode, 10, 64)
+ if code == 0 {
+ code = 200
+ }
+ plugin.Plugins().Tracing().ServerEnd(span, int(code),
statusCode)
+ }))
}
func RegisterHandlers() {
diff --git a/pkg/rest/common.go b/server/infra/tracing/tracing.go
similarity index 60%
copy from pkg/rest/common.go
copy to server/infra/tracing/tracing.go
index 4e135fb..3e5d674 100644
--- a/pkg/rest/common.go
+++ b/server/infra/tracing/tracing.go
@@ -14,29 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package rest
+package tracing
-import (
- "net/http"
-)
+const CTX_TRACE_SPAN = "x-trace-span"
-const (
- HTTP_METHOD_GET = http.MethodGet
- HTTP_METHOD_PUT = http.MethodPut
- HTTP_METHOD_POST = http.MethodPost
- HTTP_METHOD_DELETE = http.MethodDelete
+type Request interface{}
+type Span interface{}
- CTX_RESPONSE = "_server_response"
- CTX_REQUEST = "_server_request"
- CTX_MATCH_PATTERN = "_server_match_pattern"
- SERVER_CHAIN_NAME = "_server_chain"
-)
-
-func isValidMethod(method string) bool {
- switch method {
- case http.MethodGet, http.MethodPut, http.MethodPost, http.MethodDelete:
- return true
- default:
- return false
- }
+type Tracing interface {
+ ServerBegin(operationName string, r Request) Span
+ ServerEnd(span Span, code int, message string)
+ ClientBegin(operationName string, r Request) Span
+ ClientEnd(span Span, code int, message string)
}
diff --git a/server/plugin/README.md b/server/plugin/README.md
index 89ff610..c11ed53 100644
--- a/server/plugin/README.md
+++ b/server/plugin/README.md
@@ -10,6 +10,7 @@
1. auditlog, Customize audit log for any change done to the service-center.
1. cipher, Customize encryption and decryption of TLS certificate private key
password.
1. quota, Customize quota for instance registry.
+1. tracing, Customize tracing data reporter.
## Example: an authentication plug-in
diff --git a/server/plugin/infra/registry/etcd/etcd.go
b/server/plugin/infra/registry/etcd/etcd.go
index 1e57ee1..7ffe5a6 100644
--- a/server/plugin/infra/registry/etcd/etcd.go
+++ b/server/plugin/infra/registry/etcd/etcd.go
@@ -324,13 +324,20 @@ func (c *EtcdClient) paging(ctx context.Context, op
registry.PluginOp) (*clientv
}
func (c *EtcdClient) Do(ctx context.Context, opts ...registry.PluginOpOption)
(*registry.PluginResponse, error) {
+ var (
+ err error
+ resp *registry.PluginResponse
+ )
+
start := time.Now()
op := registry.OptionsToOp(opts...)
+ span := TracingBegin(ctx, "etcd:do", op)
+ defer TracingEnd(span, err)
+
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
- var err error
- var resp *registry.PluginResponse
+
switch op.Action {
case registry.Get:
var etcdResp *clientv3.GetResponse
@@ -378,9 +385,11 @@ func (c *EtcdClient) Do(ctx context.Context, opts
...registry.PluginOpOption) (*
Revision: etcdResp.Header.Revision,
}
}
+
if err != nil {
return nil, err
}
+
resp.Succeeded = true
util.LogNilOrWarnf(start, "registry client do %s", op)
@@ -399,6 +408,8 @@ func (c *EtcdClient) Txn(ctx context.Context, opts
[]registry.PluginOp) (*regist
}
func (c *EtcdClient) TxnWithCmp(ctx context.Context, success
[]registry.PluginOp, cmps []registry.CompareOp, fail []registry.PluginOp)
(*registry.PluginResponse, error) {
+ var err error
+
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
@@ -407,6 +418,9 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context,
success []registry.PluginOp
etcdSuccessOps := c.toTxnRequest(success)
etcdFailOps := c.toTxnRequest(fail)
+ span := TracingBegin(ctx, "etcd:txn", success[0])
+ defer TracingEnd(span, err)
+
kvc := clientv3.NewKV(c.Client)
txn := kvc.Txn(otCtx)
if len(etcdCmps) > 0 {
@@ -428,6 +442,11 @@ func (c *EtcdClient) TxnWithCmp(ctx context.Context,
success []registry.PluginOp
}
func (c *EtcdClient) LeaseGrant(ctx context.Context, TTL int64) (int64, error)
{
+ var err error
+ span := TracingBegin(ctx, "etcd:grant",
+ registry.PluginOp{Action: registry.Put, Key:
util.StringToBytesWithNoCopy(fmt.Sprint(TTL))})
+ defer TracingEnd(span, err)
+
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
start := time.Now()
@@ -440,6 +459,11 @@ func (c *EtcdClient) LeaseGrant(ctx context.Context, TTL
int64) (int64, error) {
}
func (c *EtcdClient) LeaseRenew(ctx context.Context, leaseID int64) (int64,
error) {
+ var err error
+ span := TracingBegin(ctx, "etcd:keepalive",
+ registry.PluginOp{Action: registry.Put, Key:
util.StringToBytesWithNoCopy(fmt.Sprint(leaseID))})
+ defer TracingEnd(span, err)
+
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
start := time.Now()
@@ -455,10 +479,15 @@ func (c *EtcdClient) LeaseRenew(ctx context.Context,
leaseID int64) (int64, erro
}
func (c *EtcdClient) LeaseRevoke(ctx context.Context, leaseID int64) error {
+ var err error
+ span := TracingBegin(ctx, "etcd:revoke",
+ registry.PluginOp{Action: registry.Delete, Key:
util.StringToBytesWithNoCopy(fmt.Sprint(leaseID))})
+ defer TracingEnd(span, err)
+
otCtx, cancel := registry.WithTimeout(ctx)
defer cancel()
start := time.Now()
- _, err := c.Client.Revoke(otCtx, clientv3.LeaseID(leaseID))
+ _, err = c.Client.Revoke(otCtx, clientv3.LeaseID(leaseID))
if err != nil {
if err.Error() == grpc.ErrorDesc(rpctypes.ErrGRPCLeaseNotFound)
{
return err
diff --git a/server/plugin/infra/registry/etcd/tracing.go
b/server/plugin/infra/registry/etcd/tracing.go
new file mode 100644
index 0000000..7e628fc
--- /dev/null
+++ b/server/plugin/infra/registry/etcd/tracing.go
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package etcd
+
+import (
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
+
"github.com/apache/incubator-servicecomb-service-center/server/infra/tracing"
+ "github.com/apache/incubator-servicecomb-service-center/server/plugin"
+ "golang.org/x/net/context"
+ "net/http"
+)
+
+func TracingBegin(ctx context.Context, operationName string, op
registry.PluginOp) tracing.Span {
+ var action string
+ switch op.Action {
+ case registry.Get:
+ action = http.MethodGet
+ case registry.Put:
+ action = http.MethodPut
+ case registry.Delete:
+ action = http.MethodDelete
+ }
+ r, err := http.NewRequest(action, util.BytesToStringWithNoCopy(op.Key),
nil)
+ if err != nil {
+ util.Logger().Errorf(err, "new backend request failed")
+ return nil
+ }
+ r = r.WithContext(ctx)
+ return plugin.Plugins().Tracing().ClientBegin(operationName, r)
+}
+
+func TracingEnd(span tracing.Span, err error) {
+ if err != nil {
+ plugin.Plugins().Tracing().ClientEnd(span,
http.StatusInternalServerError, err.Error())
+ return
+ }
+ plugin.Plugins().Tracing().ClientEnd(span, http.StatusOK, "")
+}
diff --git a/server/plugin/infra/tracing/buildin/README.md
b/server/plugin/infra/tracing/buildin/README.md
new file mode 100644
index 0000000..140203e
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/README.md
@@ -0,0 +1,34 @@
+# Report trace data
+
+## Edit the configuration of the tracing plugin
+```bash
+trace_plugin='buildin' # or empty
+```
+
+## To zipkin server
+
+[zipkin](/docs/tracing-server.PNG)
+
+### Add the zipkin server endpoint
+```
+# Export the environments
+export TRACING_COLLECTOR=server
+export TRACING_SERVER_ADDRESS=http://127.0.0.1:9411 # zipkin server endpoint
+
+# Start the Service-center
+./servicecenter
+```
+
+## To file
+
+[file](/docs/tracing-file.PNG)
+
+### Customize the path of trace data file
+```
+# Export the environments
+export TRACING_COLLECTOR=file
+export TRACING_FILE_PATH=/tmp/servicecenter.trace # if not set, use ${work
directory}/SERVICECENTER.trace
+
+# Start the Service-center
+./servicecenter
+```
diff --git a/server/plugin/infra/tracing/buildin/buildin.go
b/server/plugin/infra/tracing/buildin/buildin.go
new file mode 100644
index 0000000..bfae086
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/buildin.go
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package buildin
+
+import (
+ "context"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+
"github.com/apache/incubator-servicecomb-service-center/server/infra/tracing"
+ mgr
"github.com/apache/incubator-servicecomb-service-center/server/plugin"
+ "github.com/opentracing/opentracing-go"
+ "github.com/opentracing/opentracing-go/ext"
+ "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "net/http"
+ "sync"
+)
+
+var once sync.Once
+
+func init() {
+ mgr.RegisterPlugin(mgr.Plugin{mgr.TRACING, "buildin", New})
+}
+
+func New() mgr.PluginInstance {
+ return &Zipkin{}
+}
+
+type Zipkin struct {
+}
+
+func (zp *Zipkin) ServerBegin(operationName string, itf tracing.Request)
tracing.Span {
+ var (
+ span opentracing.Span
+ ctx context.Context
+ )
+ switch itf.(type) {
+ case *http.Request:
+ r := itf.(*http.Request)
+ ctx = r.Context()
+
+ wireContext, err :=
ZipkinTracer().Extract(opentracing.HTTPHeaders,
opentracing.HTTPHeadersCarrier(r.Header))
+ switch err {
+ case nil:
+ case opentracing.ErrSpanContextNotFound:
+ default:
+ util.Logger().Errorf(err, "tracer extract request
failed")
+ return nil
+ }
+
+ span = ZipkinTracer().StartSpan(operationName,
ext.RPCServerOption(wireContext))
+ ext.SpanKindRPCServer.Set(span)
+ ext.HTTPMethod.Set(span, r.Method)
+ ext.HTTPUrl.Set(span, r.URL.String())
+
+ span.SetTag("protocol", "HTTP")
+ span.SetTag(zipkincore.HTTP_PATH, r.URL.Path)
+ span.SetTag(zipkincore.HTTP_HOST, r.URL.Host)
+ default:
+ // grpc?
+ return nil
+ }
+
+ util.SetContext(ctx, tracing.CTX_TRACE_SPAN, span)
+ return span
+}
+
+func (zp *Zipkin) ServerEnd(itf tracing.Span, code int, message string) {
+ span, ok := itf.(opentracing.Span)
+ if !ok {
+ return
+ }
+ setResultTags(span, code, message)
+ span.Finish()
+}
+
+func (zp *Zipkin) ClientBegin(operationName string, itf tracing.Request)
tracing.Span {
+ var (
+ span opentracing.Span
+ ctx context.Context
+ carrier interface{}
+ )
+ switch itf.(type) {
+ case *http.Request:
+ r := itf.(*http.Request)
+ ctx = r.Context()
+
+ parentSpan, ok :=
ctx.Value(tracing.CTX_TRACE_SPAN).(opentracing.Span)
+ if !ok {
+ return nil
+ }
+ span = ZipkinTracer().StartSpan(operationName,
opentracing.ChildOf(parentSpan.Context()))
+ ext.SpanKindRPCClient.Set(span)
+ ext.HTTPMethod.Set(span, r.Method)
+ ext.HTTPUrl.Set(span, r.URL.String())
+
+ span.SetTag("protocol", "HTTP")
+ span.SetTag(zipkincore.HTTP_PATH, r.URL.Path)
+ span.SetTag(zipkincore.HTTP_HOST, r.URL.Host)
+
+ carrier = opentracing.HTTPHeadersCarrier(r.Header)
+ default:
+ // grpc?
+ return nil
+ }
+
+ util.SetContext(ctx, tracing.CTX_TRACE_SPAN, span)
+
+ if err := ZipkinTracer().Inject(
+ span.Context(),
+ opentracing.HTTPHeaders,
+ carrier,
+ ); err != nil {
+ util.Logger().Errorf(err, "tracer inject request failed")
+ }
+
+ return span
+}
+
+func (zp *Zipkin) ClientEnd(itf tracing.Span, code int, message string) {
+ span, ok := itf.(opentracing.Span)
+ if !ok {
+ return
+ }
+ setResultTags(span, code, message)
+ span.Finish()
+}
+
+func setResultTags(span opentracing.Span, code int, message string) {
+ if code >= http.StatusBadRequest && len(message) > 0 {
+ span.SetTag("error", message)
+ }
+ ext.HTTPStatusCode.Set(span, uint16(code))
+}
diff --git a/server/plugin/infra/tracing/buildin/common.go
b/server/plugin/infra/tracing/buildin/common.go
new file mode 100644
index 0000000..1653537
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/common.go
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package buildin
+
+import (
+ "fmt"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "github.com/apache/incubator-servicecomb-service-center/server/core"
+ "github.com/opentracing/opentracing-go"
+ zipkin "github.com/openzipkin/zipkin-go-opentracing"
+ "os"
+ "path/filepath"
+ "strings"
+)
+
+func initTracer() {
+ collector, err := newCollector()
+ if err != nil {
+ util.Logger().Errorf(err, "new tracing collector failed, use
the noop tracer")
+ return
+ }
+ ipPort, _ := util.ParseEndpoint(core.Instance.Endpoints[0])
+ recorder := zipkin.NewRecorder(collector, false, ipPort,
strings.ToLower(core.Service.ServiceName))
+ tracer, err := zipkin.NewTracer(recorder, zipkin.TraceID128Bit(true))
+ if err != nil {
+ util.Logger().Errorf(err, "new tracer failed")
+ return
+ }
+ opentracing.SetGlobalTracer(tracer)
+}
+
+func newCollector() (collector zipkin.Collector, err error) {
+ ct := strings.TrimSpace(os.Getenv("TRACING_COLLECTOR"))
+ switch ct {
+ case "server":
+ sa := GetServerEndpoint()
+ collector, err = zipkin.NewHTTPCollector(sa + "/api/v1/spans")
+ if err != nil {
+ return
+ }
+ case "file":
+ fp := GetFilePath(core.Service.ServiceName + ".trace")
+ collector, err = NewFileCollector(fp)
+ if err != nil {
+ return
+ }
+ default:
+ err = fmt.Errorf("unknown tracing collector type '%s'", ct)
+ }
+ return
+}
+
+func ZipkinTracer() opentracing.Tracer {
+ once.Do(initTracer)
+ return opentracing.GlobalTracer()
+}
+
+func GetFilePath(defName string) string {
+ path := os.Getenv("TRACING_FILE_PATH")
+ if len(path) == 0 {
+ wd, _ := os.Getwd()
+ return filepath.Join(wd, defName)
+ }
+ return path
+}
+
+func GetServerEndpoint() string {
+ sa := os.Getenv("TRACING_SERVER_ADDRESS")
+ if len(sa) == 0 {
+ sa = "http://127.0.0.1:9411"
+ }
+ return sa
+}
diff --git a/server/plugin/infra/tracing/buildin/file_collector.go
b/server/plugin/infra/tracing/buildin/file_collector.go
new file mode 100644
index 0000000..636e7aa
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/file_collector.go
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package buildin
+
+import (
+ "bufio"
+ "encoding/json"
+ "fmt"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/logrotate"
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "github.com/apache/incubator-servicecomb-service-center/server/core"
+ "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "os"
+ "time"
+)
+
+type FileCollector struct {
+ Fd *os.File
+ Interval time.Duration
+ QueueSize int
+ c chan *zipkincore.Span
+}
+
+func (f *FileCollector) Collect(span *zipkincore.Span) error {
+ if f.Fd == nil {
+ return fmt.Errorf("required FD to write")
+ }
+
+ f.c <- span
+ return nil
+}
+
+func (f *FileCollector) Close() error {
+ return f.Fd.Close()
+}
+
+func (f *FileCollector) write(batch []*zipkincore.Span) (c int) {
+ if len(batch) == 0 {
+ return
+ }
+ newLine := [...]byte{'\n'}
+ w := bufio.NewWriter(f.Fd)
+ for _, span := range batch {
+ s := FromZipkinSpan(span)
+ b, err := json.Marshal(s)
+ if err != nil {
+ util.Logger().Errorf(err, "marshal span failed")
+ continue
+ }
+ w.Write(b)
+ w.Write(newLine[:])
+ c++
+ }
+ if err := w.Flush(); err != nil {
+ util.Logger().Errorf(err, "write span to file failed")
+ }
+ return
+}
+
+func (f *FileCollector) loop(stopCh <-chan struct{}) {
+ var (
+ batch []*zipkincore.Span
+ prev []*zipkincore.Span
+ i = f.Interval * 10
+ t = time.NewTicker(f.Interval)
+ nr = time.Now().Add(i)
+ )
+ for {
+ select {
+ case <-stopCh:
+ f.write(batch)
+ return
+ case span := <-f.c:
+ batch = append(batch, span)
+ if len(batch) >= f.QueueSize {
+ if c := f.write(batch); c == 0 {
+ continue
+ }
+ if prev != nil {
+ batch, prev = prev[:0], batch
+ } else {
+ prev, batch = batch, batch[len(batch):]
// new one
+ }
+ }
+ case <-t.C:
+ if time.Now().After(nr) {
+ traceutils.LogRotateFile(f.Fd.Name(),
+
int(core.ServerInfo.Config.LogRotateSize),
+
int(core.ServerInfo.Config.LogBackupCount),
+ )
+ nr = time.Now().Add(i)
+ }
+
+ if c := f.write(batch); c > 0 {
+ batch = batch[:0]
+ }
+ }
+ }
+}
+
+func NewFileCollector(path string) (*FileCollector, error) {
+ fd, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
+ if err != nil {
+ return nil, err
+ }
+ fc := &FileCollector{
+ Fd: fd,
+ Interval: 10 * time.Second,
+ QueueSize: 100,
+ c: make(chan *zipkincore.Span, 1000),
+ }
+ util.Go(fc.loop)
+ return fc, nil
+}
diff --git a/server/plugin/infra/tracing/buildin/span.go
b/server/plugin/infra/tracing/buildin/span.go
new file mode 100644
index 0000000..32c09a7
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/span.go
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package buildin
+
+import (
+ "github.com/apache/incubator-servicecomb-service-center/pkg/util"
+ "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "github.com/openzipkin/zipkin-go-opentracing/types"
+ "strconv"
+)
+
+type Span struct {
+ TraceID string `thrift:"trace_id,1" db:"trace_id" json:"traceId"`
+ // unused field # 2
+ Name string `thrift:"name,3" db:"name" json:"name"`
+ ID string `thrift:"id,4" db:"id" json:"id"`
+ ParentID string `thrift:"parent_id,5" db:"parent_id"
json:"parentId,omitempty"`
+ Annotations []*Annotation `thrift:"annotations,6" db:"annotations"
json:"annotations"`
+ // unused field # 7
+ BinaryAnnotations []*BinaryAnnotation `thrift:"binary_annotations,8"
db:"binary_annotations" json:"binaryAnnotations"`
+ //Debug bool `thrift:"debug,9" db:"debug" json:"debug,omitempty"`
+ Timestamp *int64 `thrift:"timestamp,10" db:"timestamp"
json:"timestamp,omitempty"`
+ Duration *int64 `thrift:"duration,11" db:"duration"
json:"duration,omitempty"`
+ //TraceIDHigh *int64 `thrift:"trace_id_high,12" db:"trace_id_high"
json:"trace_id_high,omitempty"`
+}
+
+type Annotation struct {
+ Timestamp int64 `thrift:"timestamp,1" db:"timestamp"
json:"timestamp"`
+ Value string `thrift:"value,2" db:"value" json:"value"`
+ Host *Endpoint `thrift:"host,3" db:"host" json:"host,omitempty"`
+}
+
+type BinaryAnnotation struct {
+ Key string `thrift:"key,1" db:"key" json:"key"`
+ Value string `thrift:"value,2" db:"value" json:"value"`
+ //AnnotationType AnnotationType `thrift:"annotation_type,3"
db:"annotation_type" json:"annotation_type"`
+ //Host *Endpoint `thrift:"host,4" db:"host" json:"host,omitempty"`
+}
+
+type Endpoint struct {
+ Ipv4 string `thrift:"ipv4,1" db:"ipv4" json:"ipv4"`
+ Port int16 `thrift:"port,2" db:"port" json:"port"`
+ ServiceName string `thrift:"service_name,3" db:"service_name"
json:"serviceName"`
+ Ipv6 []byte `thrift:"ipv6,4" db:"ipv6" json:"ipv6,omitempty"`
+}
+
+func (s *Span) FromZipkinSpan(span *zipkincore.Span) {
+ traceId := new(types.TraceID)
+ traceId.Low = uint64(span.TraceID)
+ traceId.High = uint64(*(span.TraceIDHigh))
+ s.TraceID = traceId.ToHex()
+ s.Duration = span.Duration
+
+ s.ID = strconv.FormatUint(uint64(span.ID), 16)
+ if span.ParentID != nil {
+ s.ParentID = strconv.FormatUint(uint64(*(span.ParentID)), 16)
+ }
+
+ s.Name = span.Name
+ s.Timestamp = span.Timestamp
+
+ for _, a := range span.Annotations {
+ s.Annotations = append(s.Annotations, &Annotation{
+ Timestamp: a.Timestamp,
+ Value: a.Value,
+ Host: &Endpoint{
+ Ipv4: util.InetNtoa(uint32(a.Host.Ipv4)),
+ Port: a.Host.Port,
+ ServiceName: a.Host.ServiceName,
+ Ipv6: a.Host.Ipv6,
+ },
+ })
+ }
+
+ for _, ba := range span.BinaryAnnotations {
+ if zipkincore.SERVER_ADDR == ba.Key {
+ continue
+ }
+ s.BinaryAnnotations = append(s.BinaryAnnotations,
&BinaryAnnotation{
+ Key: ba.Key,
+ Value: string(ba.Value),
+ })
+ }
+}
+
+func FromZipkinSpan(span *zipkincore.Span) (s Span) {
+ s.FromZipkinSpan(span)
+ return
+}
diff --git a/server/plugin/infra/tracing/buildin/span_test.go
b/server/plugin/infra/tracing/buildin/span_test.go
new file mode 100644
index 0000000..c3dceb4
--- /dev/null
+++ b/server/plugin/infra/tracing/buildin/span_test.go
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package buildin
+
+import (
+ "encoding/json"
+ "fmt"
+ "github.com/openzipkin/zipkin-go-opentracing/thrift/gen-go/zipkincore"
+ "testing"
+)
+
+var sample = []byte(`{
+ "trace_id": 4081433150731846551,
+ "name": "api",
+ "id": 8350480249446290292,
+ "annotations": [
+ {
+ "timestamp": 1517455386250894,
+ "value": "sr",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "timestamp": 1517455386251872,
+ "value": "ss",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ }
+ ],
+ "binary_annotations": [
+ {
+ "key": "span.kind",
+ "value": "c2VydmVy",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "protocol",
+ "value": "SFRUUA==",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "http.method",
+ "value": "R0VU",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "http.url",
+ "value":
"L3Y0L2RlZmF1bHQvcmVnaXN0cnkvbWljcm9zZXJ2aWNlcz8lM0Fwcm9qZWN0PWRlZmF1bHQm",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "http.path",
+ "value": "L3Y0L2RlZmF1bHQvcmVnaXN0cnkvbWljcm9zZXJ2aWNlcw==",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "http.host",
+ "value": "",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "resultCode",
+ "value": "MjAw",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "result",
+ "value": "MA==",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ },
+ {
+ "key": "http.status_code",
+ "value": "MjAw",
+ "annotation_type": "STRING",
+ "host": {
+ "ipv4": 2130706433,
+ "port": 30100,
+ "service_name": "SERVICECENTER"
+ }
+ }
+ ],
+ "timestamp": 1517455386250894,
+ "duration": 978,
+ "trace_id_high": 7511721612091346172
+}`)
+
+func TestFromZipkinSpan(t *testing.T) {
+ span := &zipkincore.Span{}
+ err := json.Unmarshal(sample, &span)
+ if err != nil {
+ fmt.Println("TestFromZipkinSpan Unmarshal", err)
+ t.FailNow()
+ }
+ s := FromZipkinSpan(span)
+ b, err := json.Marshal(s)
+ if err != nil {
+ fmt.Println("TestFromZipkinSpan Marshal", err)
+ t.FailNow()
+ }
+ fmt.Println(string(b))
+}
diff --git a/server/plugin/plugin.go b/server/plugin/plugin.go
index 02ffa06..308a713 100644
--- a/server/plugin/plugin.go
+++ b/server/plugin/plugin.go
@@ -25,6 +25,7 @@ import (
"github.com/apache/incubator-servicecomb-service-center/server/infra/quota"
"github.com/apache/incubator-servicecomb-service-center/server/infra/registry"
"github.com/apache/incubator-servicecomb-service-center/server/infra/security"
+
"github.com/apache/incubator-servicecomb-service-center/server/infra/tracing"
"github.com/apache/incubator-servicecomb-service-center/server/infra/uuid"
"github.com/astaxie/beego"
pg "plugin"
@@ -40,6 +41,7 @@ const (
CIPHER
QUOTA
REGISTRY
+ TRACING
typeEnd
)
@@ -50,6 +52,7 @@ var pluginNames = map[PluginName]string{
CIPHER: "cipher",
QUOTA: "quota",
REGISTRY: "registry",
+ TRACING: "trace",
}
var pluginMgr = &PluginManager{}
@@ -216,6 +219,10 @@ func (pm *PluginManager) Quota() quota.QuotaManager {
return pm.Instance(QUOTA).(quota.QuotaManager)
}
+func (pm *PluginManager) Tracing() tracing.Tracing {
+ return pm.Instance(TRACING).(tracing.Tracing)
+}
+
func Plugins() *PluginManager {
return pluginMgr
}
diff --git a/server/service/service_suite_test.go
b/server/service/service_suite_test.go
index d9c0cb9..e64a8c4 100644
--- a/server/service/service_suite_test.go
+++ b/server/service/service_suite_test.go
@@ -21,6 +21,7 @@ import (
pb
"github.com/apache/incubator-servicecomb-service-center/server/core/proto"
_
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/quota/buildin"
_
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/registry/etcd"
+ _
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/tracing/buildin"
_
"github.com/apache/incubator-servicecomb-service-center/server/plugin/infra/uuid/buildin"
"github.com/apache/incubator-servicecomb-service-center/server/service"
. "github.com/onsi/ginkgo"
--
To stop receiving notification emails like this one, please contact
[email protected].