http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go new file mode 100644 index 0000000..9157965 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/heartbeater_test.go @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "htrace/common" + "htrace/conf" + "testing" + "time" +) + +func TestHeartbeaterStartupShutdown(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + lg := common.NewLogger("heartbeater", cnf) + hb := NewHeartbeater("ExampleHeartbeater", 1, lg) + if hb.String() != "ExampleHeartbeater" { + t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") + } + hb.Shutdown() +} + +// The number of milliseconds between heartbeats +const HEARTBEATER_PERIOD = 5 + +// The number of heartbeats to send in the test. +const NUM_TEST_HEARTBEATS = 3 + +func TestHeartbeaterSendsHeartbeats(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + lg := common.NewLogger("heartbeater", cnf) + // The minimum amount of time which the heartbeater test should take + MINIMUM_TEST_DURATION := time.Millisecond * (NUM_TEST_HEARTBEATS * HEARTBEATER_PERIOD) + duration := MINIMUM_TEST_DURATION + for duration <= MINIMUM_TEST_DURATION { + start := time.Now() + testHeartbeaterSendsHeartbeatsImpl(t, lg) + end := time.Now() + duration = end.Sub(start) + lg.Debugf("Measured duration: %v; minimum expected duration: %v\n", + duration, MINIMUM_TEST_DURATION) + } +} + +func testHeartbeaterSendsHeartbeatsImpl(t *testing.T, lg *common.Logger) { + hb := NewHeartbeater("ExampleHeartbeater", HEARTBEATER_PERIOD, lg) + if hb.String() != "ExampleHeartbeater" { + t.Fatalf("hb.String() returned %s instead of %s\n", hb.String(), "ExampleHeartbeater") + } + testChan := make(chan interface{}, NUM_TEST_HEARTBEATS) + gotAllHeartbeats := make(chan bool) + hb.AddHeartbeatTarget(&HeartbeatTarget{ + name: "ExampleHeartbeatTarget", + targetChan: testChan, + }) + go func() { + for i := 0; i < NUM_TEST_HEARTBEATS; i++ { + <-testChan + } + gotAllHeartbeats <- true + for i := 0; i < NUM_TEST_HEARTBEATS; i++ { + _, open := <-testChan + if !open { + return + } + } + }() + <-gotAllHeartbeats + hb.Shutdown() +}
http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/hrpc.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/hrpc.go b/htrace-htraced/go/src/htrace/htraced/hrpc.go new file mode 100644 index 0000000..8b5a728 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/hrpc.go @@ -0,0 +1,386 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "fmt" + "github.com/ugorji/go/codec" + "htrace/common" + "htrace/conf" + "io" + "net" + "net/rpc" + "sync" + "sync/atomic" + "time" +) + +const MAX_HRPC_HANDLERS = 32765 + +// Handles HRPC calls +type HrpcHandler struct { + lg *common.Logger + store *dataStore +} + +// The HRPC server +type HrpcServer struct { + *rpc.Server + hand *HrpcHandler + + // The listener we are using to accept new connections. + listener net.Listener + + // A WaitGroup used to block until the HRPC server has exited. + exited sync.WaitGroup + + // A channel containing server codecs to use. This channel is fully + // buffered. The number of entries it initially contains determines how + // many concurrent codecs we will have running at once. + cdcs chan *HrpcServerCodec + + // Used to shut down + shutdown chan interface{} + + // The I/O timeout to use when reading requests or sending responses. This + // timeout does not apply to the time we spend processing the message. + ioTimeo time.Duration + + // A count of all I/O errors that we have encountered since the server + // started. This counts errors like improperly formatted message frames, + // but not errors like properly formatted but invalid messages. + // This count is updated from multiple goroutines via sync/atomic. + ioErrorCount uint64 + + // The test hooks to use, or nil during normal operation. + testHooks *hrpcTestHooks +} + +type hrpcTestHooks struct { + // A callback we make right after calling Accept() but before reading from + // the new connection. + HandleAdmission func() +} + +// A codec which encodes HRPC data via JSON. This structure holds the context +// for a particular client connection. +type HrpcServerCodec struct { + lg *common.Logger + + // The current connection. + conn net.Conn + + // The HrpcServer which this connection is part of. + hsv *HrpcServer + + // The message length we read from the header. + length uint32 + + // The number of messages this connection has handled. + numHandled int + + // The buffer for reading requests. These buffers are reused for multiple + // requests to avoid allocating memory. + buf []byte + + // Configuration for msgpack decoding + msgpackHandle codec.MsgpackHandle +} + +func asJson(val interface{}) string { + js, err := json.Marshal(val) + if err != nil { + return "encoding error: " + err.Error() + } + return string(js) +} + +func newIoErrorWarn(cdc *HrpcServerCodec, val string) error { + return newIoError(cdc, val, common.WARN) +} + +func newIoError(cdc *HrpcServerCodec, val string, level common.Level) error { + if cdc.lg.LevelEnabled(level) { + cdc.lg.Write(level, cdc.conn.RemoteAddr().String()+": "+val+"\n") + } + if level >= common.INFO { + atomic.AddUint64(&cdc.hsv.ioErrorCount, 1) + } + return errors.New(val) +} + +func (cdc *HrpcServerCodec) ReadRequestHeader(req *rpc.Request) error { + hdr := common.HrpcRequestHeader{} + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("%s: Reading HRPC request header.\n", cdc.conn.RemoteAddr()) + } + cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) + err := binary.Read(cdc.conn, binary.LittleEndian, &hdr) + if err != nil { + if err == io.EOF && cdc.numHandled > 0 { + return newIoError(cdc, fmt.Sprintf("Remote closed connection "+ + "after writing %d message(s)", cdc.numHandled), common.DEBUG) + } + return newIoError(cdc, + fmt.Sprintf("Error reading request header: %s", err.Error()), common.WARN) + } + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("%s: Read HRPC request header %s\n", + cdc.conn.RemoteAddr(), asJson(&hdr)) + } + if hdr.Magic != common.HRPC_MAGIC { + return newIoErrorWarn(cdc, fmt.Sprintf("Invalid request header: expected "+ + "magic number of 0x%04x, but got 0x%04x", common.HRPC_MAGIC, hdr.Magic)) + } + if hdr.Length > common.MAX_HRPC_BODY_LENGTH { + return newIoErrorWarn(cdc, fmt.Sprintf("Length prefix was too long. "+ + "Maximum length is %d, but we got %d.", common.MAX_HRPC_BODY_LENGTH, + hdr.Length)) + } + req.ServiceMethod = common.HrpcMethodIdToMethodName(hdr.MethodId) + if req.ServiceMethod == "" { + return newIoErrorWarn(cdc, fmt.Sprintf("Unknown MethodID code 0x%04x", + hdr.MethodId)) + } + req.Seq = hdr.Seq + cdc.length = hdr.Length + return nil +} + +func (cdc *HrpcServerCodec) ReadRequestBody(body interface{}) error { + remoteAddr := cdc.conn.RemoteAddr().String() + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("%s: Reading HRPC %d-byte request body.\n", + remoteAddr, cdc.length) + } + if cap(cdc.buf) < int(cdc.length) { + var pow uint + for pow = 0; (1 << pow) < int(cdc.length); pow++ { + } + cdc.buf = make([]byte, 0, 1<<pow) + } + _, err := io.ReadFull(cdc.conn, cdc.buf[:cdc.length]) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to read %d-byte "+ + "request body: %s", cdc.length, err.Error())) + } + var zeroTime time.Time + cdc.conn.SetDeadline(zeroTime) + + dec := codec.NewDecoderBytes(cdc.buf[:cdc.length], &cdc.msgpackHandle) + err = dec.Decode(body) + if cdc.lg.TraceEnabled() { + cdc.lg.Tracef("%s: read HRPC message: %s\n", + remoteAddr, asJson(&body)) + } + req := body.(*common.WriteSpansReq) + if req == nil { + return nil + } + // We decode WriteSpans requests in a streaming fashion, to avoid overloading the garbage + // collector with a ton of trace spans all at once. + startTime := time.Now() + client, _, err := net.SplitHostPort(remoteAddr) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to split host and port "+ + "for %s: %s\n", remoteAddr, err.Error())) + } + hand := cdc.hsv.hand + ing := hand.store.NewSpanIngestor(hand.lg, client, req.DefaultTrid) + for spanIdx := 0; spanIdx < req.NumSpans; spanIdx++ { + var span *common.Span + err := dec.Decode(&span) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to decode span %d "+ + "out of %d: %s\n", spanIdx, req.NumSpans, err.Error())) + } + ing.IngestSpan(span) + } + ing.Close(startTime) + return nil +} + +var EMPTY []byte = make([]byte, 0) + +func (cdc *HrpcServerCodec) WriteResponse(resp *rpc.Response, msg interface{}) error { + cdc.conn.SetDeadline(time.Now().Add(cdc.hsv.ioTimeo)) + var err error + buf := EMPTY + if msg != nil { + w := bytes.NewBuffer(make([]byte, 0, 128)) + enc := codec.NewEncoder(w, &cdc.msgpackHandle) + err := enc.Encode(msg) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to marshal "+ + "response message: %s", err.Error())) + } + buf = w.Bytes() + } + hdr := common.HrpcResponseHeader{} + hdr.MethodId = common.HrpcMethodNameToId(resp.ServiceMethod) + hdr.Seq = resp.Seq + hdr.ErrLength = uint32(len(resp.Error)) + hdr.Length = uint32(len(buf)) + writer := bufio.NewWriterSize(cdc.conn, 256) + err = binary.Write(writer, binary.LittleEndian, &hdr) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ + "header: %s", err.Error())) + } + if hdr.ErrLength > 0 { + _, err = io.WriteString(writer, resp.Error) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write error "+ + "string: %s", err.Error())) + } + } + if hdr.Length > 0 { + var length int + length, err = writer.Write(buf) + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write response "+ + "message: %s", err.Error())) + } + if uint32(length) != hdr.Length { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write all of "+ + "response message: %s", err.Error())) + } + } + err = writer.Flush() + if err != nil { + return newIoErrorWarn(cdc, fmt.Sprintf("Failed to write the response "+ + "bytes: %s", err.Error())) + } + cdc.numHandled++ + return nil +} + +func (cdc *HrpcServerCodec) Close() error { + err := cdc.conn.Close() + cdc.conn = nil + cdc.length = 0 + cdc.numHandled = 0 + cdc.hsv.cdcs <- cdc + return err +} + +func (hand *HrpcHandler) WriteSpans(req *common.WriteSpansReq, + resp *common.WriteSpansResp) (err error) { + // Nothing to do here; WriteSpans is handled in ReadRequestBody. + return nil +} + +func CreateHrpcServer(cnf *conf.Config, store *dataStore, + testHooks *hrpcTestHooks) (*HrpcServer, error) { + lg := common.NewLogger("hrpc", cnf) + numHandlers := cnf.GetInt(conf.HTRACE_NUM_HRPC_HANDLERS) + if numHandlers < 1 { + lg.Warnf("%s must be positive: using 1 handler.\n", conf.HTRACE_NUM_HRPC_HANDLERS) + numHandlers = 1 + } + if numHandlers > MAX_HRPC_HANDLERS { + lg.Warnf("%s cannot be more than %d: using %d handlers\n", + conf.HTRACE_NUM_HRPC_HANDLERS, MAX_HRPC_HANDLERS, MAX_HRPC_HANDLERS) + numHandlers = MAX_HRPC_HANDLERS + } + hsv := &HrpcServer{ + Server: rpc.NewServer(), + hand: &HrpcHandler{ + lg: lg, + store: store, + }, + cdcs: make(chan *HrpcServerCodec, numHandlers), + shutdown: make(chan interface{}), + ioTimeo: time.Millisecond * + time.Duration(cnf.GetInt64(conf.HTRACE_HRPC_IO_TIMEOUT_MS)), + testHooks: testHooks, + } + for i := 0; i < numHandlers; i++ { + hsv.cdcs <- &HrpcServerCodec{ + lg: lg, + hsv: hsv, + msgpackHandle: codec.MsgpackHandle{ + WriteExt: true, + }, + } + } + var err error + hsv.listener, err = net.Listen("tcp", cnf.Get(conf.HTRACE_HRPC_ADDRESS)) + if err != nil { + return nil, err + } + hsv.Server.Register(hsv.hand) + hsv.exited.Add(1) + go hsv.run() + lg.Infof("Started HRPC server on %s with %d handler routines. "+ + "ioTimeo=%s.\n", hsv.listener.Addr().String(), numHandlers, + hsv.ioTimeo.String()) + return hsv, nil +} + +func (hsv *HrpcServer) run() { + lg := hsv.hand.lg + srvAddr := hsv.listener.Addr().String() + defer func() { + lg.Infof("HrpcServer on %s exiting\n", srvAddr) + hsv.exited.Done() + }() + for { + select { + case cdc := <-hsv.cdcs: + conn, err := hsv.listener.Accept() + if err != nil { + lg.Errorf("HrpcServer on %s got accept error: %s\n", srvAddr, err.Error()) + hsv.cdcs <- cdc // never blocks; there is always sufficient buffer space + continue + } + if lg.TraceEnabled() { + lg.Tracef("%s: Accepted HRPC connection.\n", conn.RemoteAddr()) + } + cdc.conn = conn + cdc.numHandled = 0 + if hsv.testHooks != nil && hsv.testHooks.HandleAdmission != nil { + hsv.testHooks.HandleAdmission() + } + go hsv.ServeCodec(cdc) + case <-hsv.shutdown: + return + } + } +} + +func (hsv *HrpcServer) Addr() net.Addr { + return hsv.listener.Addr() +} + +func (hsv *HrpcServer) GetNumIoErrors() uint64 { + return atomic.LoadUint64(&hsv.ioErrorCount) +} + +func (hsv *HrpcServer) Close() { + close(hsv.shutdown) + hsv.listener.Close() + hsv.exited.Wait() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/htraced.go b/htrace-htraced/go/src/htrace/htraced/htraced.go new file mode 100644 index 0000000..0d41e0d --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/htraced.go @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "encoding/json" + "fmt" + "github.com/alecthomas/kingpin" + "github.com/jmhodges/levigo" + "htrace/common" + "htrace/conf" + "net" + "os" + "runtime" + "time" +) + +var RELEASE_VERSION string +var GIT_VERSION string + +const USAGE = `htraced: the HTrace server daemon. + +htraced receives trace spans sent from HTrace clients. It exposes a REST +interface which others can query. It also runs a web server with a graphical +user interface. htraced stores its span data in levelDB files on the local +disks. + +Usage: +--help: this help message + +-Dk=v: set configuration key 'k' to value 'v' +For example -Dweb.address=127.0.0.1:8080 sets the web address to localhost, +port 8080. -Dlog.level=DEBUG will set the default log level to DEBUG. + +-Dk: set configuration key 'k' to 'true' + +Normally, configuration options should be set in the ` + conf.CONFIG_FILE_NAME + ` +configuration file. We find this file by searching the paths in the +` + conf.HTRACED_CONF_DIR + `. The command-line options are just an alternate way +of setting configuration when launching the daemon. +` + +func main() { + // Load the htraced configuration. + // This also parses the -Dfoo=bar command line arguments and removes them + // from os.Argv. + cnf, cnfLog := conf.LoadApplicationConfig("htraced.") + + // Parse the remaining command-line arguments. + app := kingpin.New(os.Args[0], USAGE) + version := app.Command("version", "Print server version and exit.") + cmd := kingpin.MustParse(app.Parse(os.Args[1:])) + + // Handle the "version" command-line argument. + if cmd == version.FullCommand() { + fmt.Printf("Running htraced %s [%s].\n", RELEASE_VERSION, GIT_VERSION) + os.Exit(0) + } + + // Open the HTTP port. + // We want to do this first, before initializing the datastore or setting up + // logging. That way, if someone accidentally starts two daemons with the + // same config file, the second invocation will exit with a "port in use" + // error rather than potentially disrupting the first invocation. + rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS)) + if listenErr != nil { + fmt.Fprintf(os.Stderr, "Error opening HTTP port: %s\n", + listenErr.Error()) + os.Exit(1) + } + + // Print out the startup banner and information about the daemon + // configuration. + lg := common.NewLogger("main", cnf) + defer lg.Close() + lg.Infof("*** Starting htraced %s [%s]***\n", RELEASE_VERSION, GIT_VERSION) + scanner := bufio.NewScanner(cnfLog) + for scanner.Scan() { + lg.Infof(scanner.Text() + "\n") + } + common.InstallSignalHandlers(cnf) + if runtime.GOMAXPROCS(0) == 1 { + ncpu := runtime.NumCPU() + runtime.GOMAXPROCS(ncpu) + lg.Infof("setting GOMAXPROCS=%d\n", ncpu) + } else { + lg.Infof("GOMAXPROCS=%d\n", runtime.GOMAXPROCS(0)) + } + lg.Infof("leveldb version=%d.%d\n", + levigo.GetLevelDBMajorVersion(), levigo.GetLevelDBMinorVersion()) + + // Initialize the datastore. + store, err := CreateDataStore(cnf, nil) + if err != nil { + lg.Errorf("Error creating datastore: %s\n", err.Error()) + os.Exit(1) + } + var rsv *RestServer + rsv, err = CreateRestServer(cnf, store, rstListener) + if err != nil { + lg.Errorf("Error creating REST server: %s\n", err.Error()) + os.Exit(1) + } + var hsv *HrpcServer + if cnf.Get(conf.HTRACE_HRPC_ADDRESS) != "" { + hsv, err = CreateHrpcServer(cnf, store, nil) + if err != nil { + lg.Errorf("Error creating HRPC server: %s\n", err.Error()) + os.Exit(1) + } + } else { + lg.Infof("Not starting HRPC server because no value was given for %s.\n", + conf.HTRACE_HRPC_ADDRESS) + } + naddr := cnf.Get(conf.HTRACE_STARTUP_NOTIFICATION_ADDRESS) + if naddr != "" { + notif := StartupNotification{ + HttpAddr: rsv.Addr().String(), + ProcessId: os.Getpid(), + } + if hsv != nil { + notif.HrpcAddr = hsv.Addr().String() + } + err = sendStartupNotification(naddr, ¬if) + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to send startup notification: "+ + "%s\n", err.Error()) + os.Exit(1) + } + } + for { + time.Sleep(time.Duration(10) * time.Hour) + } +} + +// A startup notification message that we optionally send on startup. +// Used by unit tests. +type StartupNotification struct { + HttpAddr string + HrpcAddr string + ProcessId int +} + +func sendStartupNotification(naddr string, notif *StartupNotification) error { + conn, err := net.Dial("tcp", naddr) + if err != nil { + return err + } + defer func() { + if conn != nil { + conn.Close() + } + }() + var buf []byte + buf, err = json.Marshal(notif) + if err != nil { + return err + } + _, err = conn.Write(buf) + conn.Close() + conn = nil + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/loader.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/loader.go b/htrace-htraced/go/src/htrace/htraced/loader.go new file mode 100644 index 0000000..95c5c3e --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/loader.go @@ -0,0 +1,511 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "errors" + "fmt" + "github.com/jmhodges/levigo" + "github.com/ugorji/go/codec" + "htrace/common" + "htrace/conf" + "io" + "math" + "math/rand" + "os" + "strings" + "syscall" + "time" +) + +// Routines for loading the datastore. + +// The leveldb key which has information about the shard. +const SHARD_INFO_KEY = 'w' + +// A constant signifying that we don't know what the layout version is. +const UNKNOWN_LAYOUT_VERSION = 0 + +// The current layout version. We cannot read layout versions newer than this. +// We may sometimes be able to read older versions, but only by doing an +// upgrade. +const CURRENT_LAYOUT_VERSION = 3 + +type DataStoreLoader struct { + // The dataStore logger. + lg *common.Logger + + // True if we should clear the stored data. + ClearStored bool + + // The shards that we're loading + shards []*ShardLoader + + // The options to use for opening datastores in LevelDB. + openOpts *levigo.Options + + // The read options to use for LevelDB. + readOpts *levigo.ReadOptions + + // The write options to use for LevelDB. + writeOpts *levigo.WriteOptions +} + +// Information about a Shard. +type ShardInfo struct { + // The layout version of the datastore. + // We should always keep this field so that old software can recognize new + // layout versions, even if it can't read them. + LayoutVersion uint64 + + // A random number identifying this daemon. + DaemonId uint64 + + // The total number of shards in this datastore. + TotalShards uint32 + + // The index of this shard within the datastore. + ShardIndex uint32 +} + +// Create a new datastore loader. +// Initializes the loader, but does not load any leveldb instances. +func NewDataStoreLoader(cnf *conf.Config) *DataStoreLoader { + dld := &DataStoreLoader{ + lg: common.NewLogger("datastore", cnf), + ClearStored: cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR), + } + dld.readOpts = levigo.NewReadOptions() + dld.readOpts.SetFillCache(true) + dld.readOpts.SetVerifyChecksums(false) + dld.writeOpts = levigo.NewWriteOptions() + dld.writeOpts.SetSync(false) + dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) + rdirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) + // Filter out empty entries + dirs := make([]string, 0, len(rdirs)) + for i := range rdirs { + if strings.TrimSpace(rdirs[i]) != "" { + dirs = append(dirs, rdirs[i]) + } + } + dld.shards = make([]*ShardLoader, len(dirs)) + for i := range dirs { + dld.shards[i] = &ShardLoader{ + dld: dld, + path: dirs[i] + conf.PATH_SEP + "db", + } + } + dld.openOpts = levigo.NewOptions() + cacheSize := cnf.GetInt(conf.HTRACE_LEVELDB_CACHE_SIZE) + dld.openOpts.SetCache(levigo.NewLRUCache(cacheSize)) + dld.openOpts.SetParanoidChecks(false) + writeBufferSize := cnf.GetInt(conf.HTRACE_LEVELDB_WRITE_BUFFER_SIZE) + if writeBufferSize > 0 { + dld.openOpts.SetWriteBufferSize(writeBufferSize) + } + maxFdPerShard := dld.calculateMaxOpenFilesPerShard() + if maxFdPerShard > 0 { + dld.openOpts.SetMaxOpenFiles(maxFdPerShard) + } + return dld +} + +func (dld *DataStoreLoader) Close() { + if dld.lg != nil { + dld.lg.Close() + dld.lg = nil + } + if dld.openOpts != nil { + dld.openOpts.Close() + dld.openOpts = nil + } + if dld.readOpts != nil { + dld.readOpts.Close() + dld.readOpts = nil + } + if dld.writeOpts != nil { + dld.writeOpts.Close() + dld.writeOpts = nil + } + if dld.shards != nil { + for i := range dld.shards { + if dld.shards[i] != nil { + dld.shards[i].Close() + } + } + dld.shards = nil + } +} + +func (dld *DataStoreLoader) DisownResources() { + dld.lg = nil + dld.openOpts = nil + dld.readOpts = nil + dld.writeOpts = nil + dld.shards = nil +} + +// The maximum number of file descriptors we'll use on non-datastore things. +const NON_DATASTORE_FD_MAX = 300 + +// The minimum number of file descriptors per shard we will set. Setting fewer +// than this number could trigger a bug in some early versions of leveldb. +const MIN_FDS_PER_SHARD = 80 + +func (dld *DataStoreLoader) calculateMaxOpenFilesPerShard() int { + var rlim syscall.Rlimit + err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rlim) + if err != nil { + dld.lg.Warnf("Unable to calculate maximum open files per shard: "+ + "getrlimit failed: %s\n", err.Error()) + return 0 + } + // I think RLIMIT_NOFILE fits in 32 bits on all known operating systems, + // but there's no harm in being careful. 'int' in golang always holds at + // least 32 bits. + var maxFd int + if rlim.Cur > uint64(math.MaxInt32) { + maxFd = math.MaxInt32 + } else { + maxFd = int(rlim.Cur) + } + if len(dld.shards) == 0 { + dld.lg.Warnf("Unable to calculate maximum open files per shard, " + + "since there are 0 shards configured.\n") + return 0 + } + fdsPerShard := (maxFd - NON_DATASTORE_FD_MAX) / len(dld.shards) + if fdsPerShard < MIN_FDS_PER_SHARD { + dld.lg.Warnf("Expected to be able to use at least %d "+ + "fds per shard, but we have %d shards and %d total fds to allocate, "+ + "giving us only %d FDs per shard.", MIN_FDS_PER_SHARD, + len(dld.shards), maxFd-NON_DATASTORE_FD_MAX, fdsPerShard) + return 0 + } + dld.lg.Infof("maxFd = %d. Setting maxFdPerShard = %d\n", + maxFd, fdsPerShard) + return fdsPerShard +} + +// Load information about all shards. +func (dld *DataStoreLoader) LoadShards() { + for i := range dld.shards { + shd := dld.shards[i] + shd.load() + } +} + +// Verify that the shard infos are consistent. +// Reorders the shardInfo structures based on their ShardIndex. +func (dld *DataStoreLoader) VerifyShardInfos() error { + if len(dld.shards) < 1 { + return errors.New("No shard directories found.") + } + // Make sure no shards had errors. + for i := range dld.shards { + shd := dld.shards[i] + if shd.infoErr != nil { + return shd.infoErr + } + } + // Make sure that if any shards are empty, all shards are empty. + emptyShards := "" + prefix := "" + for i := range dld.shards { + if dld.shards[i].info == nil { + emptyShards = prefix + dld.shards[i].path + prefix = ", " + } + } + if emptyShards != "" { + for i := range dld.shards { + if dld.shards[i].info != nil { + return errors.New(fmt.Sprintf("Shards %s were empty, but "+ + "the other shards had data.", emptyShards)) + } + } + // All shards are empty. + return nil + } + // Make sure that all shards have the same layout version, daemonId, and number of total + // shards. + layoutVersion := dld.shards[0].info.LayoutVersion + daemonId := dld.shards[0].info.DaemonId + totalShards := dld.shards[0].info.TotalShards + for i := 1; i < len(dld.shards); i++ { + shd := dld.shards[i] + if layoutVersion != shd.info.LayoutVersion { + return errors.New(fmt.Sprintf("Layout version mismatch. Shard "+ + "%s has layout version 0x%016x, but shard %s has layout "+ + "version 0x%016x.", + dld.shards[0].path, layoutVersion, shd.path, shd.info.LayoutVersion)) + } + if daemonId != shd.info.DaemonId { + return errors.New(fmt.Sprintf("DaemonId mismatch. Shard %s has "+ + "daemonId 0x%016x, but shard %s has daemonId 0x%016x.", + dld.shards[0].path, daemonId, shd.path, shd.info.DaemonId)) + } + if totalShards != shd.info.TotalShards { + return errors.New(fmt.Sprintf("TotalShards mismatch. Shard %s has "+ + "TotalShards = %d, but shard %s has TotalShards = %d.", + dld.shards[0].path, totalShards, shd.path, shd.info.TotalShards)) + } + if shd.info.ShardIndex >= totalShards { + return errors.New(fmt.Sprintf("Invalid ShardIndex. Shard %s has "+ + "ShardIndex = %d, but TotalShards = %d.", + shd.path, shd.info.ShardIndex, shd.info.TotalShards)) + } + } + if layoutVersion != CURRENT_LAYOUT_VERSION { + return errors.New(fmt.Sprintf("The layout version of all shards "+ + "is %d, but we only support version %d.", + layoutVersion, CURRENT_LAYOUT_VERSION)) + } + if totalShards != uint32(len(dld.shards)) { + return errors.New(fmt.Sprintf("The TotalShards field of all shards "+ + "is %d, but we have %d shards.", totalShards, len(dld.shards))) + } + // Reorder shards in order of their ShardIndex. + reorderedShards := make([]*ShardLoader, len(dld.shards)) + for i := 0; i < len(dld.shards); i++ { + shd := dld.shards[i] + shardIdx := shd.info.ShardIndex + if reorderedShards[shardIdx] != nil { + return errors.New(fmt.Sprintf("Both shard %s and "+ + "shard %s have ShardIndex %d.", shd.path, + reorderedShards[shardIdx].path, shardIdx)) + } + reorderedShards[shardIdx] = shd + } + dld.shards = reorderedShards + return nil +} + +func (dld *DataStoreLoader) Load() error { + var err error + // If data.store.clear was set, clear existing data. + if dld.ClearStored { + err = dld.clearStored() + if err != nil { + return err + } + } + // Make sure the shard directories exist in all cases, with a mkdir -p + for i := range dld.shards { + err := os.MkdirAll(dld.shards[i].path, 0777) + if err != nil { + return errors.New(fmt.Sprintf("Failed to MkdirAll(%s): %s", + dld.shards[i].path, err.Error())) + } + } + // Get information about each shard, and verify them. + dld.LoadShards() + err = dld.VerifyShardInfos() + if err != nil { + return err + } + if dld.shards[0].ldb != nil { + dld.lg.Infof("Loaded %d leveldb instances with "+ + "DaemonId of 0x%016x\n", len(dld.shards), + dld.shards[0].info.DaemonId) + } else { + // Create leveldb instances if needed. + rnd := rand.New(rand.NewSource(time.Now().UnixNano())) + daemonId := uint64(rnd.Int63()) + dld.lg.Infof("Initializing %d leveldb instances with a new "+ + "DaemonId of 0x%016x\n", len(dld.shards), daemonId) + dld.openOpts.SetCreateIfMissing(true) + for i := range dld.shards { + shd := dld.shards[i] + shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) + if err != nil { + return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+ + "create the shard: %s", shd.path, err.Error())) + } + info := &ShardInfo{ + LayoutVersion: CURRENT_LAYOUT_VERSION, + DaemonId: daemonId, + TotalShards: uint32(len(dld.shards)), + ShardIndex: uint32(i), + } + err = shd.writeShardInfo(info) + if err != nil { + return errors.New(fmt.Sprintf("levigo.Open(%s) failed to "+ + "write shard info: %s", shd.path, err.Error())) + } + dld.lg.Infof("Shard %s initialized with ShardInfo %s \n", + shd.path, asJson(info)) + } + } + return nil +} + +func (dld *DataStoreLoader) clearStored() error { + for i := range dld.shards { + path := dld.shards[i].path + fi, err := os.Stat(path) + if err != nil && !os.IsNotExist(err) { + dld.lg.Errorf("Failed to stat %s: %s\n", path, err.Error()) + return err + } + if fi != nil { + err = os.RemoveAll(path) + if err != nil { + dld.lg.Errorf("Failed to clear existing datastore directory %s: %s\n", + path, err.Error()) + return err + } + dld.lg.Infof("Cleared existing datastore directory %s\n", path) + } + } + return nil +} + +type ShardLoader struct { + // The parent DataStoreLoader + dld *DataStoreLoader + + // Path to the shard + path string + + // Leveldb instance of the shard + ldb *levigo.DB + + // Information about the shard + info *ShardInfo + + // If non-null, the error we encountered trying to load the shard info. + infoErr error +} + +func (shd *ShardLoader) Close() { + if shd.ldb != nil { + shd.ldb.Close() + shd.ldb = nil + } +} + +// Load information about a particular shard. +func (shd *ShardLoader) load() { + shd.info = nil + fi, err := os.Stat(shd.path) + if err != nil { + if os.IsNotExist(err) { + shd.infoErr = nil + return + } + shd.infoErr = errors.New(fmt.Sprintf( + "stat() error on leveldb directory "+ + "%s: %s", shd.path, err.Error())) + return + } + if !fi.Mode().IsDir() { + shd.infoErr = errors.New(fmt.Sprintf( + "stat() error on leveldb directory "+ + "%s: inode is not directory.", shd.path)) + return + } + var dbDir *os.File + dbDir, err = os.Open(shd.path) + if err != nil { + shd.infoErr = errors.New(fmt.Sprintf( + "open() error on leveldb directory "+ + "%s: %s.", shd.path, err.Error())) + return + } + defer func() { + if dbDir != nil { + dbDir.Close() + } + }() + _, err = dbDir.Readdirnames(1) + if err != nil { + if err == io.EOF { + // The db directory is empty. + shd.infoErr = nil + return + } + shd.infoErr = errors.New(fmt.Sprintf( + "Readdirnames() error on leveldb directory "+ + "%s: %s.", shd.path, err.Error())) + return + } + dbDir.Close() + dbDir = nil + shd.ldb, err = levigo.Open(shd.path, shd.dld.openOpts) + if err != nil { + shd.ldb = nil + shd.infoErr = errors.New(fmt.Sprintf( + "levigo.Open() error on leveldb directory "+ + "%s: %s.", shd.path, err.Error())) + return + } + shd.info, err = shd.readShardInfo() + if err != nil { + shd.infoErr = err + return + } + shd.infoErr = nil +} + +func (shd *ShardLoader) readShardInfo() (*ShardInfo, error) { + buf, err := shd.ldb.Get(shd.dld.readOpts, []byte{SHARD_INFO_KEY}) + if err != nil { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): failed to "+ + "read shard info key: %s", shd.path, err.Error())) + } + if len(buf) == 0 { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): got zero-"+ + "length value for shard info key.", shd.path)) + } + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + r := bytes.NewBuffer(buf) + decoder := codec.NewDecoder(r, mh) + shardInfo := &ShardInfo{ + LayoutVersion: UNKNOWN_LAYOUT_VERSION, + } + err = decoder.Decode(shardInfo) + if err != nil { + return nil, errors.New(fmt.Sprintf("readShardInfo(%s): msgpack "+ + "decoding failed for shard info key: %s", shd.path, err.Error())) + } + return shardInfo, nil +} + +func (shd *ShardLoader) writeShardInfo(info *ShardInfo) error { + mh := new(codec.MsgpackHandle) + mh.WriteExt = true + w := new(bytes.Buffer) + enc := codec.NewEncoder(w, mh) + err := enc.Encode(info) + if err != nil { + return errors.New(fmt.Sprintf("msgpack encoding error: %s", + err.Error())) + } + err = shd.ldb.Put(shd.dld.writeOpts, []byte{SHARD_INFO_KEY}, w.Bytes()) + if err != nil { + return errors.New(fmt.Sprintf("leveldb write error: %s", + err.Error())) + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/metrics.go b/htrace-htraced/go/src/htrace/htraced/metrics.go new file mode 100644 index 0000000..d2feca8 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/metrics.go @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "htrace/common" + "htrace/conf" + "math" + "sync" + "time" +) + +// +// The Metrics Sink for HTraced. +// +// The Metrics sink keeps track of metrics for the htraced daemon. +// It is important to have good metrics so that we can properly manager htraced. In particular, we +// need to know what rate we are receiving spans at, the main places spans came from. If spans +// were dropped because of a high sampling rates, we need to know which part of the system dropped +// them so that we can adjust the sampling rate there. +// + +const LATENCY_CIRC_BUF_SIZE = 4096 + +type MetricsSink struct { + // The metrics sink logger. + lg *common.Logger + + // The maximum number of entries we shuld allow in the HostSpanMetrics map. + maxMtx int + + // The total number of spans ingested by the server (counting dropped spans) + IngestedSpans uint64 + + // The total number of spans written to leveldb since the server started. + WrittenSpans uint64 + + // The total number of spans dropped by the server. + ServerDropped uint64 + + // Per-host Span Metrics + HostSpanMetrics common.SpanMetricsMap + + // The last few writeSpan latencies + wsLatencyCircBuf *CircBufU32 + + // Lock protecting all metrics + lock sync.Mutex +} + +func NewMetricsSink(cnf *conf.Config) *MetricsSink { + return &MetricsSink{ + lg: common.NewLogger("metrics", cnf), + maxMtx: cnf.GetInt(conf.HTRACE_METRICS_MAX_ADDR_ENTRIES), + HostSpanMetrics: make(common.SpanMetricsMap), + wsLatencyCircBuf: NewCircBufU32(LATENCY_CIRC_BUF_SIZE), + } +} + +// Update the total number of spans which were ingested, as well as other +// metrics that get updated during span ingest. +func (msink *MetricsSink) UpdateIngested(addr string, totalIngested int, + serverDropped int, wsLatency time.Duration) { + msink.lock.Lock() + defer msink.lock.Unlock() + msink.IngestedSpans += uint64(totalIngested) + msink.ServerDropped += uint64(serverDropped) + msink.updateSpanMetrics(addr, 0, serverDropped) + wsLatencyMs := wsLatency.Nanoseconds() / 1000000 + var wsLatency32 uint32 + if wsLatencyMs > math.MaxUint32 { + wsLatency32 = math.MaxUint32 + } else { + wsLatency32 = uint32(wsLatencyMs) + } + msink.wsLatencyCircBuf.Append(wsLatency32) +} + +// Update the per-host span metrics. Must be called with the lock held. +func (msink *MetricsSink) updateSpanMetrics(addr string, numWritten int, + serverDropped int) { + mtx, found := msink.HostSpanMetrics[addr] + if !found { + // Ensure that the per-host span metrics map doesn't grow too large. + if len(msink.HostSpanMetrics) >= msink.maxMtx { + // Delete a random entry + for k := range msink.HostSpanMetrics { + msink.lg.Warnf("Evicting metrics entry for addr %s "+ + "because there are more than %d addrs.\n", k, msink.maxMtx) + delete(msink.HostSpanMetrics, k) + break + } + } + mtx = &common.SpanMetrics{} + msink.HostSpanMetrics[addr] = mtx + } + mtx.Written += uint64(numWritten) + mtx.ServerDropped += uint64(serverDropped) +} + +// Update the total number of spans which were persisted to disk. +func (msink *MetricsSink) UpdatePersisted(addr string, totalWritten int, + serverDropped int) { + msink.lock.Lock() + defer msink.lock.Unlock() + msink.WrittenSpans += uint64(totalWritten) + msink.ServerDropped += uint64(serverDropped) + msink.updateSpanMetrics(addr, totalWritten, serverDropped) +} + +// Read the server stats. +func (msink *MetricsSink) PopulateServerStats(stats *common.ServerStats) { + msink.lock.Lock() + defer msink.lock.Unlock() + stats.IngestedSpans = msink.IngestedSpans + stats.WrittenSpans = msink.WrittenSpans + stats.ServerDroppedSpans = msink.ServerDropped + stats.MaxWriteSpansLatencyMs = msink.wsLatencyCircBuf.Max() + stats.AverageWriteSpansLatencyMs = msink.wsLatencyCircBuf.Average() + stats.HostSpanMetrics = make(common.SpanMetricsMap) + for k, v := range msink.HostSpanMetrics { + stats.HostSpanMetrics[k] = &common.SpanMetrics{ + Written: v.Written, + ServerDropped: v.ServerDropped, + } + } +} + +// A circular buffer of uint32s which supports appending and taking the +// average, and some other things. +type CircBufU32 struct { + // The next slot to fill + slot int + + // The number of slots which are in use. This number only ever + // increases until the buffer is full. + slotsUsed int + + // The buffer + buf []uint32 +} + +func NewCircBufU32(size int) *CircBufU32 { + return &CircBufU32{ + slotsUsed: -1, + buf: make([]uint32, size), + } +} + +func (cbuf *CircBufU32) Max() uint32 { + var max uint32 + for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { + if cbuf.buf[bufIdx] > max { + max = cbuf.buf[bufIdx] + } + } + return max +} + +func (cbuf *CircBufU32) Average() uint32 { + var total uint64 + for bufIdx := 0; bufIdx < cbuf.slotsUsed; bufIdx++ { + total += uint64(cbuf.buf[bufIdx]) + } + return uint32(total / uint64(cbuf.slotsUsed)) +} + +func (cbuf *CircBufU32) Append(val uint32) { + cbuf.buf[cbuf.slot] = val + cbuf.slot++ + if cbuf.slotsUsed < cbuf.slot { + cbuf.slotsUsed = cbuf.slot + } + if cbuf.slot >= len(cbuf.buf) { + cbuf.slot = 0 + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/metrics_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/metrics_test.go b/htrace-htraced/go/src/htrace/htraced/metrics_test.go new file mode 100644 index 0000000..4f27ffd --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/metrics_test.go @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + htrace "htrace/client" + "htrace/common" + "htrace/conf" + "reflect" + "testing" + "time" +) + +func compareTotals(a, b common.SpanMetricsMap) bool { + for k, v := range a { + if !reflect.DeepEqual(v, b[k]) { + return false + } + } + for k, v := range b { + if !reflect.DeepEqual(v, a[k]) { + return false + } + } + return true +} + +type Fatalfer interface { + Fatalf(format string, args ...interface{}) +} + +func assertNumWrittenEquals(t Fatalfer, msink *MetricsSink, + expectedNumWritten int) { + var sstats common.ServerStats + msink.PopulateServerStats(&sstats) + if sstats.WrittenSpans != uint64(expectedNumWritten) { + t.Fatalf("sstats.WrittenSpans = %d, but expected %d\n", + sstats.WrittenSpans, len(SIMPLE_TEST_SPANS)) + } + if sstats.HostSpanMetrics["127.0.0.1"] == nil { + t.Fatalf("no entry for sstats.HostSpanMetrics[127.0.0.1] found.") + } + if sstats.HostSpanMetrics["127.0.0.1"].Written != + uint64(expectedNumWritten) { + t.Fatalf("sstats.HostSpanMetrics[127.0.0.1].Written = %d, but "+ + "expected %d\n", sstats.HostSpanMetrics["127.0.0.1"].Written, + len(SIMPLE_TEST_SPANS)) + } +} + +func TestMetricsSinkPerHostEviction(t *testing.T) { + cnfBld := conf.Builder{ + Values: conf.TEST_VALUES(), + Defaults: conf.DEFAULTS, + } + cnfBld.Values[conf.HTRACE_METRICS_MAX_ADDR_ENTRIES] = "2" + cnf, err := cnfBld.Build() + if err != nil { + t.Fatalf("failed to create conf: %s", err.Error()) + } + msink := NewMetricsSink(cnf) + msink.UpdatePersisted("192.168.0.100", 20, 10) + msink.UpdatePersisted("192.168.0.101", 20, 10) + msink.UpdatePersisted("192.168.0.102", 20, 10) + msink.lock.Lock() + defer msink.lock.Unlock() + if len(msink.HostSpanMetrics) != 2 { + for k, v := range msink.HostSpanMetrics { + fmt.Printf("WATERMELON: [%s] = [%s]\n", k, v) + } + t.Fatalf("Expected len(msink.HostSpanMetrics) to be 2, but got %d\n", + len(msink.HostSpanMetrics)) + } +} + +func TestIngestedSpansMetricsRest(t *testing.T) { + testIngestedSpansMetricsImpl(t, false) +} + +func TestIngestedSpansMetricsPacked(t *testing.T) { + testIngestedSpansMetricsImpl(t, true) +} + +func testIngestedSpansMetricsImpl(t *testing.T, usePacked bool) { + htraceBld := &MiniHTracedBuilder{Name: "TestIngestedSpansMetrics", + DataDirs: make([]string, 2), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() + var hcl *htrace.Client + hcl, err = htrace.NewClient(ht.ClientConf(), &htrace.TestHooks{ + HrpcDisabled: !usePacked, + }) + if err != nil { + t.Fatalf("failed to create client: %s", err.Error()) + } + + NUM_TEST_SPANS := 12 + allSpans := createRandomTestSpans(NUM_TEST_SPANS) + err = hcl.WriteSpans(allSpans) + if err != nil { + t.Fatalf("WriteSpans failed: %s\n", err.Error()) + } + for { + var stats *common.ServerStats + stats, err = hcl.GetServerStats() + if err != nil { + t.Fatalf("GetServerStats failed: %s\n", err.Error()) + } + if stats.IngestedSpans == uint64(NUM_TEST_SPANS) { + break + } + time.Sleep(1 * time.Millisecond) + } +} + +func TestCircBuf32(t *testing.T) { + cbuf := NewCircBufU32(3) + // We arbitrarily define that empty circular buffers have an average of 0. + if cbuf.Average() != 0 { + t.Fatalf("expected empty CircBufU32 to have an average of 0.\n") + } + if cbuf.Max() != 0 { + t.Fatalf("expected empty CircBufU32 to have a max of 0.\n") + } + cbuf.Append(2) + if cbuf.Average() != 2 { + t.Fatalf("expected one-element CircBufU32 to have an average of 2.\n") + } + cbuf.Append(10) + if cbuf.Average() != 6 { + t.Fatalf("expected two-element CircBufU32 to have an average of 6.\n") + } + cbuf.Append(12) + if cbuf.Average() != 8 { + t.Fatalf("expected three-element CircBufU32 to have an average of 8.\n") + } + cbuf.Append(14) + // The 14 overwrites the original 2 element. + if cbuf.Average() != 12 { + t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") + } + cbuf.Append(1) + // The 1 overwrites the original 10 element. + if cbuf.Average() != 9 { + t.Fatalf("expected three-element CircBufU32 to have an average of 12.\n") + } + if cbuf.Max() != 14 { + t.Fatalf("expected three-element CircBufU32 to have a max of 14.\n") + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/mini_htraced.go b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go new file mode 100644 index 0000000..af8d379 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/mini_htraced.go @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "htrace/common" + "htrace/conf" + "io/ioutil" + "net" + "os" + "strings" +) + +// +// MiniHTraceD is used in unit tests to set up a daemon with certain settings. +// It takes care of things like creating and cleaning up temporary directories. +// + +// The default number of managed data directories to use. +const DEFAULT_NUM_DATA_DIRS = 2 + +// Builds a MiniHTraced object. +type MiniHTracedBuilder struct { + // The name of the MiniHTraced to build. This shows up in the test directory name and some + // other places. + Name string + + // The configuration values to use for the MiniHTraced. + // If ths is nil, we use the default configuration for everything. + Cnf map[string]string + + // The DataDirs to use. Empty entries will turn into random names. + DataDirs []string + + // If true, we will keep the data dirs around after MiniHTraced#Close + KeepDataDirsOnClose bool + + // If non-null, the WrittenSpans semaphore to use when creating the DataStore. + WrittenSpans *common.Semaphore + + // The test hooks to use for the HRPC server + HrpcTestHooks *hrpcTestHooks +} + +type MiniHTraced struct { + Name string + Cnf *conf.Config + DataDirs []string + Store *dataStore + Rsv *RestServer + Hsv *HrpcServer + Lg *common.Logger + KeepDataDirsOnClose bool +} + +func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { + var err error + var store *dataStore + var rsv *RestServer + var hsv *HrpcServer + if bld.Name == "" { + bld.Name = "HTraceTest" + } + if bld.Cnf == nil { + bld.Cnf = make(map[string]string) + } + if bld.DataDirs == nil { + bld.DataDirs = make([]string, 2) + } + for idx := range bld.DataDirs { + if bld.DataDirs[idx] == "" { + bld.DataDirs[idx], err = ioutil.TempDir(os.TempDir(), + fmt.Sprintf("%s%d", bld.Name, idx+1)) + if err != nil { + return nil, err + } + } + } + // Copy the default test configuration values. + for k, v := range conf.TEST_VALUES() { + _, hasVal := bld.Cnf[k] + if !hasVal { + bld.Cnf[k] = v + } + } + bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = + strings.Join(bld.DataDirs, conf.PATH_LIST_SEP) + cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} + cnf, err := cnfBld.Build() + if err != nil { + return nil, err + } + lg := common.NewLogger("mini.htraced", cnf) + defer func() { + if err != nil { + if store != nil { + store.Close() + } + for idx := range bld.DataDirs { + if !bld.KeepDataDirsOnClose { + if bld.DataDirs[idx] != "" { + os.RemoveAll(bld.DataDirs[idx]) + } + } + } + if rsv != nil { + rsv.Close() + } + lg.Infof("Failed to create MiniHTraced %s: %s\n", bld.Name, err.Error()) + lg.Close() + } + }() + store, err = CreateDataStore(cnf, bld.WrittenSpans) + if err != nil { + return nil, err + } + rstListener, listenErr := net.Listen("tcp", cnf.Get(conf.HTRACE_WEB_ADDRESS)) + if listenErr != nil { + return nil, listenErr + } + defer func() { + if rstListener != nil { + rstListener.Close() + } + }() + rsv, err = CreateRestServer(cnf, store, rstListener) + if err != nil { + return nil, err + } + rstListener = nil + hsv, err = CreateHrpcServer(cnf, store, bld.HrpcTestHooks) + if err != nil { + return nil, err + } + + lg.Infof("Created MiniHTraced %s\n", bld.Name) + return &MiniHTraced{ + Name: bld.Name, + Cnf: cnf, + DataDirs: bld.DataDirs, + Store: store, + Rsv: rsv, + Hsv: hsv, + Lg: lg, + KeepDataDirsOnClose: bld.KeepDataDirsOnClose, + }, nil +} + +// Return a Config object that clients can use to connect to this MiniHTraceD. +func (ht *MiniHTraced) ClientConf() *conf.Config { + return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), + conf.HTRACE_HRPC_ADDRESS, ht.Hsv.Addr().String()) +} + +// Return a Config object that clients can use to connect to this MiniHTraceD +// by HTTP only (no HRPC). +func (ht *MiniHTraced) RestOnlyClientConf() *conf.Config { + return ht.Cnf.Clone(conf.HTRACE_WEB_ADDRESS, ht.Rsv.Addr().String(), + conf.HTRACE_HRPC_ADDRESS, "") +} + +func (ht *MiniHTraced) Close() { + ht.Lg.Infof("Closing MiniHTraced %s\n", ht.Name) + ht.Rsv.Close() + ht.Hsv.Close() + ht.Store.Close() + if !ht.KeepDataDirsOnClose { + for idx := range ht.DataDirs { + ht.Lg.Infof("Removing %s...\n", ht.DataDirs[idx]) + os.RemoveAll(ht.DataDirs[idx]) + } + } + ht.Lg.Infof("Finished closing MiniHTraced %s\n", ht.Name) + ht.Lg.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/reaper_test.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/reaper_test.go b/htrace-htraced/go/src/htrace/htraced/reaper_test.go new file mode 100644 index 0000000..af11e38 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/reaper_test.go @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "htrace/common" + "htrace/conf" + "htrace/test" + "math/rand" + "testing" + "time" +) + +func TestReapingOldSpans(t *testing.T) { + const NUM_TEST_SPANS = 20 + testSpans := make([]*common.Span, NUM_TEST_SPANS) + rnd := rand.New(rand.NewSource(2)) + now := common.TimeToUnixMs(time.Now().UTC()) + for i := range testSpans { + testSpans[i] = test.NewRandomSpan(rnd, testSpans[0:i]) + testSpans[i].Begin = now - int64(NUM_TEST_SPANS-1-i) + testSpans[i].Description = fmt.Sprintf("Span%02d", i) + } + htraceBld := &MiniHTracedBuilder{Name: "TestReapingOldSpans", + Cnf: map[string]string{ + conf.HTRACE_SPAN_EXPIRY_MS: fmt.Sprintf("%d", 60*60*1000), + conf.HTRACE_REAPER_HEARTBEAT_PERIOD_MS: "1", + conf.HTRACE_DATASTORE_HEARTBEAT_PERIOD_MS: "1", + }, + WrittenSpans: common.NewSemaphore(0), + DataDirs: make([]string, 2), + } + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create mini htraced cluster: %s\n", err.Error()) + } + ing := ht.Store.NewSpanIngestor(ht.Store.lg, "127.0.0.1", "") + for spanIdx := range testSpans { + ing.IngestSpan(testSpans[spanIdx]) + } + ing.Close(time.Now()) + // Wait the spans to be created + ht.Store.WrittenSpans.Waits(NUM_TEST_SPANS) + // Set a reaper date that will remove all the spans except final one. + ht.Store.rpr.SetReaperDate(now) + + common.WaitFor(5*time.Minute, time.Millisecond, func() bool { + for i := 0; i < NUM_TEST_SPANS-1; i++ { + span := ht.Store.FindSpan(testSpans[i].Id) + if span != nil { + ht.Store.lg.Debugf("Waiting for %s to be removed...\n", + testSpans[i].Description) + return false + } + } + span := ht.Store.FindSpan(testSpans[NUM_TEST_SPANS-1].Id) + if span == nil { + ht.Store.lg.Debugf("Did not expect %s to be removed\n", + testSpans[NUM_TEST_SPANS-1].Description) + return false + } + return true + }) + defer ht.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htraced/rest.go b/htrace-htraced/go/src/htrace/htraced/rest.go new file mode 100644 index 0000000..1ba4791 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htraced/rest.go @@ -0,0 +1,376 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/gorilla/mux" + "htrace/common" + "htrace/conf" + "net" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" +) + +// Set the response headers. +func setResponseHeaders(hdr http.Header) { + hdr.Set("Content-Type", "application/json") +} + +// Write a JSON error response. +func writeError(lg *common.Logger, w http.ResponseWriter, errCode int, + errStr string) { + str := strings.Replace(errStr, `"`, `'`, -1) + lg.Info(str + "\n") + w.WriteHeader(errCode) + w.Write([]byte(`{ "error" : "` + str + `"}`)) +} + +type serverVersionHandler struct { + lg *common.Logger +} + +func (hand *serverVersionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + version := common.ServerVersion{ReleaseVersion: RELEASE_VERSION, + GitVersion: GIT_VERSION} + buf, err := json.Marshal(&version) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("error marshalling ServerVersion: %s\n", err.Error())) + return + } + if hand.lg.DebugEnabled() { + hand.lg.Debugf("Returned ServerVersion %s\n", string(buf)) + } + w.Write(buf) +} + +type serverDebugInfoHandler struct { + lg *common.Logger +} + +func (hand *serverDebugInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + buf := make([]byte, 1<<20) + common.GetStackTraces(&buf) + resp := common.ServerDebugInfo{ + StackTraces: string(buf), + GCStats: common.GetGCStats(), + } + buf, err := json.Marshal(&resp) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("error marshalling ServerDebugInfo: %s\n", err.Error())) + return + } + w.Write(buf) + hand.lg.Info("Returned ServerDebugInfo\n") +} + +type serverStatsHandler struct { + dataStoreHandler +} + +func (hand *serverStatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + hand.lg.Debugf("serverStatsHandler\n") + stats := hand.store.ServerStats() + buf, err := json.Marshal(&stats) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("error marshalling ServerStats: %s\n", err.Error())) + return + } + hand.lg.Debugf("Returned ServerStats %s\n", string(buf)) + w.Write(buf) +} + +type serverConfHandler struct { + cnf *conf.Config + lg *common.Logger +} + +func (hand *serverConfHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + hand.lg.Debugf("serverConfHandler\n") + cnfMap := hand.cnf.Export() + buf, err := json.Marshal(&cnfMap) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("error marshalling serverConf: %s\n", err.Error())) + return + } + hand.lg.Debugf("Returned server configuration %s\n", string(buf)) + w.Write(buf) +} + +type dataStoreHandler struct { + lg *common.Logger + store *dataStore +} + +func (hand *dataStoreHandler) parseSid(w http.ResponseWriter, + str string) (common.SpanId, bool) { + var id common.SpanId + err := id.FromString(str) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Failed to parse span ID %s: %s", str, err.Error())) + w.Write([]byte("Error parsing : " + err.Error())) + return common.INVALID_SPAN_ID, false + } + return id, true +} + +func (hand *dataStoreHandler) getReqField32(fieldName string, w http.ResponseWriter, + req *http.Request) (int32, bool) { + str := req.FormValue(fieldName) + if str == "" { + writeError(hand.lg, w, http.StatusBadRequest, fmt.Sprintf("No %s specified.", fieldName)) + return -1, false + } + val, err := strconv.ParseUint(str, 16, 32) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Error parsing %s: %s.", fieldName, err.Error())) + return -1, false + } + return int32(val), true +} + +type findSidHandler struct { + dataStoreHandler +} + +func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + req.ParseForm() + vars := mux.Vars(req) + stringSid := vars["id"] + sid, ok := hand.parseSid(w, stringSid) + if !ok { + return + } + hand.lg.Debugf("findSidHandler(sid=%s)\n", sid.String()) + span := hand.store.FindSpan(sid) + if span == nil { + writeError(hand.lg, w, http.StatusNoContent, + fmt.Sprintf("No such span as %s\n", sid.String())) + return + } + w.Write(span.ToJson()) +} + +type findChildrenHandler struct { + dataStoreHandler +} + +func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + req.ParseForm() + vars := mux.Vars(req) + stringSid := vars["id"] + sid, ok := hand.parseSid(w, stringSid) + if !ok { + return + } + var lim int32 + lim, ok = hand.getReqField32("lim", w, req) + if !ok { + return + } + hand.lg.Debugf("findChildrenHandler(sid=%s, lim=%d)\n", sid.String(), lim) + children := hand.store.FindChildren(sid, lim) + jbytes, err := json.Marshal(children) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Error marshalling children: %s", err.Error())) + return + } + w.Write(jbytes) +} + +type writeSpansHandler struct { + dataStoreHandler +} + +func (hand *writeSpansHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + startTime := time.Now() + setResponseHeaders(w.Header()) + client, _, serr := net.SplitHostPort(req.RemoteAddr) + if serr != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Failed to split host and port for %s: %s\n", + req.RemoteAddr, serr.Error())) + return + } + dec := json.NewDecoder(req.Body) + var msg common.WriteSpansReq + err := dec.Decode(&msg) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Error parsing WriteSpansReq: %s", err.Error())) + return + } + if hand.lg.TraceEnabled() { + hand.lg.Tracef("%s: read WriteSpans REST message: %s\n", + req.RemoteAddr, asJson(&msg)) + } + ing := hand.store.NewSpanIngestor(hand.lg, client, msg.DefaultTrid) + for spanIdx := 0; spanIdx < msg.NumSpans; spanIdx++ { + var span *common.Span + err := dec.Decode(&span) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Failed to decode span %d out of %d: ", + spanIdx, msg.NumSpans, err.Error())) + return + } + ing.IngestSpan(span) + } + ing.Close(startTime) + return +} + +type queryHandler struct { + lg *common.Logger + dataStoreHandler +} + +func (hand *queryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + setResponseHeaders(w.Header()) + queryString := req.FormValue("query") + if queryString == "" { + writeError(hand.lg, w, http.StatusBadRequest, "No query provided.\n") + return + } + var query common.Query + reader := bytes.NewBufferString(queryString) + dec := json.NewDecoder(reader) + err := dec.Decode(&query) + if err != nil { + writeError(hand.lg, w, http.StatusBadRequest, + fmt.Sprintf("Error parsing query '%s': %s", queryString, err.Error())) + return + } + var results []*common.Span + results, err, _ = hand.store.HandleQuery(&query) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Internal error processing query %s: %s", + query.String(), err.Error())) + return + } + var jbytes []byte + jbytes, err = json.Marshal(results) + if err != nil { + writeError(hand.lg, w, http.StatusInternalServerError, + fmt.Sprintf("Error marshalling results: %s", err.Error())) + return + } + w.Write(jbytes) +} + +type logErrorHandler struct { + lg *common.Logger +} + +func (hand *logErrorHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + hand.lg.Errorf("Got unknown request %s\n", req.RequestURI) + writeError(hand.lg, w, http.StatusBadRequest, "Unknown request.") +} + +type RestServer struct { + http.Server + listener net.Listener + lg *common.Logger +} + +func CreateRestServer(cnf *conf.Config, store *dataStore, + listener net.Listener) (*RestServer, error) { + var err error + rsv := &RestServer{} + rsv.lg = common.NewLogger("rest", cnf) + + r := mux.NewRouter().StrictSlash(false) + + r.Handle("/server/info", &serverVersionHandler{lg: rsv.lg}).Methods("GET") + r.Handle("/server/version", &serverVersionHandler{lg: rsv.lg}).Methods("GET") + r.Handle("/server/debugInfo", &serverDebugInfoHandler{lg: rsv.lg}).Methods("GET") + + serverStatsH := &serverStatsHandler{dataStoreHandler: dataStoreHandler{ + store: store, lg: rsv.lg}} + r.Handle("/server/stats", serverStatsH).Methods("GET") + + serverConfH := &serverConfHandler{cnf: cnf, lg: rsv.lg} + r.Handle("/server/conf", serverConfH).Methods("GET") + + writeSpansH := &writeSpansHandler{dataStoreHandler: dataStoreHandler{ + store: store, lg: rsv.lg}} + r.Handle("/writeSpans", writeSpansH).Methods("POST") + + queryH := &queryHandler{lg: rsv.lg, dataStoreHandler: dataStoreHandler{store: store}} + r.Handle("/query", queryH).Methods("GET") + + span := r.PathPrefix("/span").Subrouter() + findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store, lg: rsv.lg}} + span.Handle("/{id}", findSidH).Methods("GET") + + findChildrenH := &findChildrenHandler{dataStoreHandler: dataStoreHandler{store: store, + lg: rsv.lg}} + span.Handle("/{id}/children", findChildrenH).Methods("GET") + + // Default Handler. This will serve requests for static requests. + webdir := os.Getenv("HTRACED_WEB_DIR") + if webdir == "" { + webdir, err = filepath.Abs(filepath.Join(filepath.Dir(os.Args[0]), "..", "web")) + if err != nil { + return nil, err + } + } + + rsv.lg.Infof(`Serving static files from "%s"`+"\n", webdir) + r.PathPrefix("/").Handler(http.FileServer(http.Dir(webdir))).Methods("GET") + + // Log an error message for unknown non-GET requests. + r.PathPrefix("/").Handler(&logErrorHandler{lg: rsv.lg}) + + rsv.listener = listener + rsv.Handler = r + rsv.ErrorLog = rsv.lg.Wrap("[REST] ", common.INFO) + go rsv.Serve(rsv.listener) + rsv.lg.Infof("Started REST server on %s\n", rsv.listener.Addr().String()) + return rsv, nil +} + +func (rsv *RestServer) Addr() net.Addr { + return rsv.listener.Addr() +} + +func (rsv *RestServer) Close() { + rsv.listener.Close() +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htracedTool/cmd.go b/htrace-htraced/go/src/htrace/htracedTool/cmd.go new file mode 100644 index 0000000..65b67e5 --- /dev/null +++ b/htrace-htraced/go/src/htrace/htracedTool/cmd.go @@ -0,0 +1,442 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/alecthomas/kingpin" + htrace "htrace/client" + "htrace/common" + "htrace/conf" + "io" + "os" + "sort" + "strings" + "text/tabwriter" + "time" +) + +var RELEASE_VERSION string +var GIT_VERSION string + +const EXIT_SUCCESS = 0 +const EXIT_FAILURE = 1 + +var verbose bool + +const USAGE = `The Apache HTrace command-line tool. This tool retrieves and modifies settings and +other data on a running htraced daemon. + +If we find an ` + conf.CONFIG_FILE_NAME + ` configuration file in the list of directories +specified in ` + conf.HTRACED_CONF_DIR + `, we will use that configuration; otherwise, +the defaults will be used. +` + +func main() { + // Load htraced configuration + cnf, cnfLog := conf.LoadApplicationConfig("htrace.tool.") + lg := common.NewLogger("conf", cnf) + defer lg.Close() + scanner := bufio.NewScanner(cnfLog) + for scanner.Scan() { + lg.Debugf(scanner.Text() + "\n") + } + + // Parse argv + app := kingpin.New(os.Args[0], USAGE) + app.Flag("Dmy.key", "Set configuration key 'my.key' to 'my.value'. Replace 'my.key' "+ + "with any key you want to set.").Default("my.value").String() + addr := app.Flag("addr", "Server address.").String() + verbose = *app.Flag("verbose", "Verbose.").Default("false").Bool() + version := app.Command("version", "Print the version of this program.") + serverVersion := app.Command("serverVersion", "Print the version of the htraced server.") + serverStats := app.Command("serverStats", "Print statistics retrieved from the htraced server.") + serverStatsJson := serverStats.Flag("json", "Display statistics as raw JSON.").Default("false").Bool() + serverDebugInfo := app.Command("serverDebugInfo", "Print the debug info of the htraced server.") + serverConf := app.Command("serverConf", "Print the server configuration retrieved from the htraced server.") + findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.") + findSpanId := findSpan.Arg("id", "Span ID to find. Example: be305e54-4534-2110-a0b2-e06b9effe112").Required().String() + findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.") + parentSpanId := findChildren.Arg("id", "Span ID to print children for. Example: be305e54-4534-2110-a0b2-e06b9effe112"). + Required().String() + childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int() + loadFile := app.Command("loadFile", "Write whitespace-separated JSON spans from a file to the server.") + loadFilePath := loadFile.Arg("path", + "A file containing whitespace-separated span JSON.").Required().String() + loadJson := app.Command("load", "Write JSON spans from the command-line to the server.") + loadJsonArg := loadJson.Arg("json", "A JSON span to write to the server.").Required().String() + dumpAll := app.Command("dumpAll", "Dump all spans from the htraced daemon.") + dumpAllOutPath := dumpAll.Arg("path", "The path to dump the trace spans to.").Default("-").String() + dumpAllLim := dumpAll.Flag("lim", "The number of spans to transfer from the server at once."). + Default("100").Int() + graph := app.Command("graph", "Visualize span JSON as a graph.") + graphJsonFile := graph.Arg("input", "The JSON file to load").Required().String() + graphDotFile := graph.Flag("output", + "The path to write a GraphViz dotfile to. This file can be used as input to "+ + "GraphViz, in order to generate a pretty picture. See graphviz.org for more "+ + "information about generating pictures of graphs.").Default("-").String() + query := app.Command("query", "Send a query to htraced.") + queryLim := query.Flag("lim", "Maximum number of spans to retrieve.").Default("20").Int() + queryArg := query.Arg("query", "The query string to send. Query strings have the format "+ + "[TYPE] [OPERATOR] [CONST], joined by AND statements.").Required().String() + rawQuery := app.Command("rawQuery", "Send a raw JSON query to htraced.") + rawQueryArg := rawQuery.Arg("json", "The query JSON to send.").Required().String() + cmd := kingpin.MustParse(app.Parse(os.Args[1:])) + + // Add the command-line settings into the configuration. + if *addr != "" { + cnf = cnf.Clone(conf.HTRACE_WEB_ADDRESS, *addr) + } + + // Handle commands that don't require an HTrace client. + switch cmd { + case version.FullCommand(): + os.Exit(printVersion()) + case graph.FullCommand(): + err := jsonSpanFileToDotFile(*graphJsonFile, *graphDotFile) + if err != nil { + fmt.Printf("graphing error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + } + + // Create HTrace client + hcl, err := htrace.NewClient(cnf, nil) + if err != nil { + fmt.Printf("Failed to create HTrace client: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + + // Handle commands that require an HTrace client. + switch cmd { + case version.FullCommand(): + os.Exit(printVersion()) + case serverVersion.FullCommand(): + os.Exit(printServerVersion(hcl)) + case serverStats.FullCommand(): + if *serverStatsJson { + os.Exit(printServerStatsJson(hcl)) + } else { + os.Exit(printServerStats(hcl)) + } + case serverDebugInfo.FullCommand(): + os.Exit(printServerDebugInfo(hcl)) + case serverConf.FullCommand(): + os.Exit(printServerConfJson(hcl)) + case findSpan.FullCommand(): + var id *common.SpanId + id.FromString(*findSpanId) + os.Exit(doFindSpan(hcl, *id)) + case findChildren.FullCommand(): + var id *common.SpanId + id.FromString(*parentSpanId) + os.Exit(doFindChildren(hcl, *id, *childLim)) + case loadJson.FullCommand(): + os.Exit(doLoadSpanJson(hcl, *loadJsonArg)) + case loadFile.FullCommand(): + os.Exit(doLoadSpanJsonFile(hcl, *loadFilePath)) + case dumpAll.FullCommand(): + err := doDumpAll(hcl, *dumpAllOutPath, *dumpAllLim) + if err != nil { + fmt.Printf("dumpAll error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + case query.FullCommand(): + err := doQueryFromString(hcl, *queryArg, *queryLim) + if err != nil { + fmt.Printf("query error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + case rawQuery.FullCommand(): + err := doRawQuery(hcl, *rawQueryArg) + if err != nil { + fmt.Printf("raw query error: %s\n", err.Error()) + os.Exit(EXIT_FAILURE) + } + os.Exit(EXIT_SUCCESS) + } + + app.UsageErrorf(os.Stderr, "You must supply a command to run.") +} + +// Print the version of the htrace binary. +func printVersion() int { + fmt.Printf("Running htracedTool %s [%s].\n", RELEASE_VERSION, GIT_VERSION) + return EXIT_SUCCESS +} + +// Print information retrieved from an htraced server via /server/info +func printServerVersion(hcl *htrace.Client) int { + ver, err := hcl.GetServerVersion() + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + fmt.Printf("HTraced server version %s (%s)\n", ver.ReleaseVersion, ver.GitVersion) + return EXIT_SUCCESS +} + +// Print information retrieved from an htraced server via /server/info +func printServerStats(hcl *htrace.Client) int { + stats, err := hcl.GetServerStats() + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + w := new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 0, '\t', 0) + fmt.Fprintf(w, "HTRACED SERVER STATS\n") + fmt.Fprintf(w, "Datastore Start\t%s\n", + common.UnixMsToTime(stats.LastStartMs).Format(time.RFC3339)) + fmt.Fprintf(w, "Server Time\t%s\n", + common.UnixMsToTime(stats.CurMs).Format(time.RFC3339)) + fmt.Fprintf(w, "Spans reaped\t%d\n", stats.ReapedSpans) + fmt.Fprintf(w, "Spans ingested\t%d\n", stats.IngestedSpans) + fmt.Fprintf(w, "Spans written\t%d\n", stats.WrittenSpans) + fmt.Fprintf(w, "Spans dropped by server\t%d\n", stats.ServerDroppedSpans) + dur := time.Millisecond * time.Duration(stats.AverageWriteSpansLatencyMs) + fmt.Fprintf(w, "Average WriteSpan Latency\t%s\n", dur.String()) + dur = time.Millisecond * time.Duration(stats.MaxWriteSpansLatencyMs) + fmt.Fprintf(w, "Maximum WriteSpan Latency\t%s\n", dur.String()) + fmt.Fprintf(w, "Number of leveldb directories\t%d\n", len(stats.Dirs)) + w.Flush() + fmt.Println("") + for i := range stats.Dirs { + dir := stats.Dirs[i] + fmt.Printf("==== %s ===\n", dir.Path) + fmt.Printf("Approximate number of bytes: %d\n", dir.ApproximateBytes) + stats := strings.Replace(dir.LevelDbStats, "\\n", "\n", -1) + fmt.Printf("%s\n", stats) + } + w = new(tabwriter.Writer) + w.Init(os.Stdout, 0, 8, 0, '\t', 0) + fmt.Fprintf(w, "HOST SPAN METRICS\n") + mtxMap := stats.HostSpanMetrics + keys := make(sort.StringSlice, len(mtxMap)) + i := 0 + for k, _ := range mtxMap { + keys[i] = k + i++ + } + sort.Sort(keys) + for k := range keys { + mtx := mtxMap[keys[k]] + fmt.Fprintf(w, "%s\twritten: %d\tserver dropped: %d\n", + keys[k], mtx.Written, mtx.ServerDropped) + } + w.Flush() + return EXIT_SUCCESS +} + +// Print information retrieved from an htraced server via /server/info as JSON +func printServerStatsJson(hcl *htrace.Client) int { + stats, err := hcl.GetServerStats() + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + buf, err := json.MarshalIndent(stats, "", " ") + if err != nil { + fmt.Printf("Error marshalling server stats: %s", err.Error()) + return EXIT_FAILURE + } + fmt.Printf("%s\n", string(buf)) + return EXIT_SUCCESS +} + +// Print information retrieved from an htraced server via /server/debugInfo +func printServerDebugInfo(hcl *htrace.Client) int { + stats, err := hcl.GetServerDebugInfo() + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + fmt.Println("=== GOROUTINE STACKS ===") + fmt.Print(stats.StackTraces) + fmt.Println("=== END GOROUTINE STACKS ===") + fmt.Println("=== GC STATISTICS ===") + fmt.Print(stats.GCStats) + fmt.Println("=== END GC STATISTICS ===") + return EXIT_SUCCESS +} + +// Print information retrieved from an htraced server via /server/conf as JSON +func printServerConfJson(hcl *htrace.Client) int { + cnf, err := hcl.GetServerConf() + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + buf, err := json.MarshalIndent(cnf, "", " ") + if err != nil { + fmt.Printf("Error marshalling server conf: %s", err.Error()) + return EXIT_FAILURE + } + fmt.Printf("%s\n", string(buf)) + return EXIT_SUCCESS +} + +// Print information about a trace span. +func doFindSpan(hcl *htrace.Client, sid common.SpanId) int { + span, err := hcl.FindSpan(sid) + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + if span == nil { + fmt.Printf("Span ID not found.\n") + return EXIT_FAILURE + } + pbuf, err := json.MarshalIndent(span, "", " ") + if err != nil { + fmt.Printf("Error: error pretty-printing span to JSON: %s\n", err.Error()) + return EXIT_FAILURE + } + fmt.Printf("%s\n", string(pbuf)) + return EXIT_SUCCESS +} + +func doLoadSpanJsonFile(hcl *htrace.Client, spanFile string) int { + if spanFile == "" { + fmt.Printf("You must specify the json file to load.\n") + return EXIT_FAILURE + } + file, err := OpenInputFile(spanFile) + if err != nil { + fmt.Printf("Failed to open %s: %s\n", spanFile, err.Error()) + return EXIT_FAILURE + } + defer file.Close() + return doLoadSpans(hcl, bufio.NewReader(file)) +} + +func doLoadSpanJson(hcl *htrace.Client, spanJson string) int { + return doLoadSpans(hcl, bytes.NewBufferString(spanJson)) +} + +func doLoadSpans(hcl *htrace.Client, reader io.Reader) int { + dec := json.NewDecoder(reader) + spans := make([]*common.Span, 0, 32) + var err error + for { + var span common.Span + if err = dec.Decode(&span); err != nil { + if err == io.EOF { + break + } + fmt.Printf("Failed to decode JSON: %s\n", err.Error()) + return EXIT_FAILURE + } + spans = append(spans, &span) + } + if verbose { + fmt.Printf("Writing ") + prefix := "" + for i := range spans { + fmt.Printf("%s%s", prefix, spans[i].ToJson()) + prefix = ", " + } + fmt.Printf("\n") + } + err = hcl.WriteSpans(spans) + if err != nil { + fmt.Println(err.Error()) + return EXIT_FAILURE + } + return EXIT_SUCCESS +} + +// Find information about the children of a span. +func doFindChildren(hcl *htrace.Client, sid common.SpanId, lim int) int { + spanIds, err := hcl.FindChildren(sid, lim) + if err != nil { + fmt.Printf("%s\n", err.Error()) + return EXIT_FAILURE + } + pbuf, err := json.MarshalIndent(spanIds, "", " ") + if err != nil { + fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error()) + return 1 + } + fmt.Printf("%s\n", string(pbuf)) + return 0 +} + +// Dump all spans from the htraced daemon. +func doDumpAll(hcl *htrace.Client, outPath string, lim int) error { + file, err := CreateOutputFile(outPath) + if err != nil { + return err + } + w := bufio.NewWriter(file) + defer func() { + if file != nil { + w.Flush() + file.Close() + } + }() + out := make(chan *common.Span, 50) + var dumpErr error + go func() { + dumpErr = hcl.DumpAll(lim, out) + }() + var numSpans int64 + nextLogTime := time.Now().Add(time.Second * 5) + for { + span, channelOpen := <-out + if !channelOpen { + break + } + if err == nil { + _, err = fmt.Fprintf(w, "%s\n", span.ToJson()) + } + if verbose { + numSpans++ + now := time.Now() + if !now.Before(nextLogTime) { + nextLogTime = now.Add(time.Second * 5) + fmt.Printf("received %d span(s)...\n", numSpans) + } + } + } + if err != nil { + return errors.New(fmt.Sprintf("Write error %s", err.Error())) + } + if dumpErr != nil { + return errors.New(fmt.Sprintf("Dump error %s", dumpErr.Error())) + } + err = w.Flush() + if err != nil { + return err + } + err = file.Close() + file = nil + if err != nil { + return err + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/5737e65b/htrace-htraced/go/src/htrace/htracedTool/file.go ---------------------------------------------------------------------- diff --git a/htrace-htraced/go/src/htrace/htracedTool/file.go b/htrace-htraced/go/src/htrace/htracedTool/file.go new file mode 100644 index 0000000..ca9c18d --- /dev/null +++ b/htrace-htraced/go/src/htrace/htracedTool/file.go @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "htrace/common" + "io" + "os" +) + +// A file used for input. +// Transparently supports using stdin for input. +type InputFile struct { + *os.File + path string +} + +// Open an input file. Stdin will be used when path is - +func OpenInputFile(path string) (*InputFile, error) { + if path == "-" { + return &InputFile{File: os.Stdin, path: path}, nil + } + file, err := os.Open(path) + if err != nil { + return nil, err + } + return &InputFile{File: file, path: path}, nil +} + +func (file *InputFile) Close() { + if file.path != "-" { + file.File.Close() + } +} + +// A file used for output. +// Transparently supports using stdout for output. +type OutputFile struct { + *os.File + path string +} + +// Create an output file. Stdout will be used when path is - +func CreateOutputFile(path string) (*OutputFile, error) { + if path == "-" { + return &OutputFile{File: os.Stdout, path: path}, nil + } + file, err := os.Create(path) + if err != nil { + return nil, err + } + return &OutputFile{File: file, path: path}, nil +} + +func (file *OutputFile) Close() error { + if file.path != "-" { + return file.File.Close() + } + return nil +} + +// FailureDeferringWriter is a writer which allows us to call Printf multiple +// times and then check if all the printfs succeeded at the very end, rather +// than checking after each call. We will not attempt to write more data +// after the first write failure. +type FailureDeferringWriter struct { + io.Writer + err error +} + +func NewFailureDeferringWriter(writer io.Writer) *FailureDeferringWriter { + return &FailureDeferringWriter{writer, nil} +} + +func (w *FailureDeferringWriter) Printf(format string, v ...interface{}) { + if w.err != nil { + return + } + str := fmt.Sprintf(format, v...) + _, err := w.Writer.Write([]byte(str)) + if err != nil { + w.err = err + } +} + +func (w *FailureDeferringWriter) Error() error { + return w.err +} + +// Read a file full of whitespace-separated span JSON into a slice of spans. +func readSpansFile(path string) (common.SpanSlice, error) { + file, err := OpenInputFile(path) + if err != nil { + return nil, err + } + defer file.Close() + return readSpans(bufio.NewReader(file)) +} + +// Read whitespace-separated span JSON into a slice of spans. +func readSpans(reader io.Reader) (common.SpanSlice, error) { + spans := make(common.SpanSlice, 0) + dec := json.NewDecoder(reader) + for { + var span common.Span + err := dec.Decode(&span) + if err != nil { + if err != io.EOF { + return nil, errors.New(fmt.Sprintf("Decode error after decoding %d "+ + "span(s): %s", len(spans), err.Error())) + } + break + } + spans = append(spans, &span) + } + return spans, nil +}
