Repository: incubator-htrace Updated Branches: refs/heads/master fc7bf2383 -> 192595391
HTRACE-9: Add standalone htraced server Project: http://git-wip-us.apache.org/repos/asf/incubator-htrace/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-htrace/commit/19259539 Tree: http://git-wip-us.apache.org/repos/asf/incubator-htrace/tree/19259539 Diff: http://git-wip-us.apache.org/repos/asf/incubator-htrace/diff/19259539 Branch: refs/heads/master Commit: 192595391a636bbd5a2a1eac7719462df3b06fee Parents: fc7bf23 Author: Colin Patrick Mccabe <[email protected]> Authored: Fri Dec 5 14:11:47 2014 -0800 Committer: Colin Patrick Mccabe <[email protected]> Committed: Fri Dec 5 16:50:28 2014 -0800 ---------------------------------------------------------------------- htrace-core/pom.xml | 35 ++ htrace-core/src/go/BUILDING.txt | 30 ++ htrace-core/src/go/format.sh | 40 ++ htrace-core/src/go/gobuild.sh | 80 ++++ .../src/go/src/org/apache/htrace/common/rest.go | 26 ++ .../src/go/src/org/apache/htrace/common/span.go | 85 ++++ .../src/org/apache/htrace/common/span_test.go | 67 +++ .../src/org/apache/htrace/common/test_util.go | 74 ++++ .../go/src/org/apache/htrace/common/version.go | 22 + .../src/go/src/org/apache/htrace/conf/config.go | 201 +++++++++ .../src/org/apache/htrace/conf/config_keys.go | 62 +++ .../src/org/apache/htrace/conf/config_test.go | 121 +++++ .../src/go/src/org/apache/htrace/conf/xml.go | 61 +++ .../src/go/src/org/apache/htrace/htrace/cmd.go | 157 +++++++ .../src/org/apache/htrace/htraced/datastore.go | 438 +++++++++++++++++++ .../org/apache/htrace/htraced/datastore_test.go | 144 ++++++ .../go/src/org/apache/htrace/htraced/htraced.go | 34 ++ .../org/apache/htrace/htraced/mini_htraced.go | 118 +++++ .../go/src/org/apache/htrace/htraced/rest.go | 142 ++++++ .../src/go/src/org/apache/htrace/test/random.go | 71 +++ 20 files changed, 2008 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/pom.xml ---------------------------------------------------------------------- diff --git a/htrace-core/pom.xml b/htrace-core/pom.xml index a8ea8f1..47677d8 100644 --- a/htrace-core/pom.xml +++ b/htrace-core/pom.xml @@ -66,6 +66,41 @@ language governing permissions and limitations under the License. --> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>compile</phase> + <id>go_compile</id> + <goals><goal>run</goal></goals> + <configuration> + <tasks> + <exec executable="./gobuild.sh" + dir="${basedir}/src/go/" + failonerror="true"> + </exec> + </tasks> + <goals>run</goals> + </configuration> + </execution> + <execution> + <phase>test</phase> + <id>go_test</id> + <goals><goal>run</goal></goals> + <configuration> + <tasks> + <exec executable="./gobuild.sh" + dir="${basedir}/src/go/" + failonerror="true"> + <arg value="test"/> + </exec> + </tasks> + <goals>run</goals> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-gpg-plugin</artifactId> </plugin> <plugin> http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/BUILDING.txt ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/BUILDING.txt b/htrace-core/src/go/BUILDING.txt new file mode 100644 index 0000000..d54d410 --- /dev/null +++ b/htrace-core/src/go/BUILDING.txt @@ -0,0 +1,30 @@ +Building the HTrace Go code +=========================== +The htrace go code consists of 4 main parts: +* The "htraced" standalone server + This is a server which accepts trace spans, and services REST queries. + +* The "htrace" command-line program which can query the server + This is a simple command-line program which can query the htrace server. + +* The htraced Javascript Web UI (not yet implemented) + +* The htrace go client library (not yet implemented) + This is the equivalent of the Java HTrace client library, but written in Go. + +You can build all these parts simply by running "gobuild.sh". +The binaries will be created in bin/. + +Dependencies +============ +You will need to install: +* The Go programming language +* The development package for leveldb (some Linux distros call this "leveldb-devel") containing libleveldb.so + +htraced requires libleveldb.so to be in your shared library path in order to run. +You can set LD_LIBRARY_PATH to the path for this library, or simply install +libleveldb.so to your system library path. + +Testing +======= +You can run the unit tests by running "./gobuild.sh test" http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/format.sh ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/format.sh b/htrace-core/src/go/format.sh new file mode 100755 index 0000000..6b1c8ae --- /dev/null +++ b/htrace-core/src/go/format.sh @@ -0,0 +1,40 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Reformats the HTrace code. +# +# ./format.sh Reformats all code. +# + +die() { + echo $@ + exit 1 +} + +# Check for gofmt. It should be installed whenever the go developement tools +# are installed. +which gofmt &> /dev/null +[ $? -ne 0 ] && die "You must install the gofmt code reformatting formatting tool." + +# Find go sources. We assume no newlines or whitespace in file names. +SCRIPT_DIR="$(cd "$( dirname $0 )" && pwd)" +find "${SCRIPT_DIR}" -noleaf -xdev -name '*.go' | xargs -l gofmt -w http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/gobuild.sh ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/gobuild.sh b/htrace-core/src/go/gobuild.sh new file mode 100755 index 0000000..135bdd2 --- /dev/null +++ b/htrace-core/src/go/gobuild.sh @@ -0,0 +1,80 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# +# Builds the HTrace server code. +# +# ./build.sh Builds the code. +# ./build.sh test Builds and runs all unit tests. +# ./build.sh bench Builds and runs all benchmarks +# + +die() { + echo $@ + exit 1 +} + +ACTION=get +if [ $# -gt 0 ]; then + if [ "x${1}" == "xbench" ]; then + # run benchmarks + ACTION="test" + set -- "$@" -test.bench=. + else + # run specified action + ACTION="${1}" + fi + shift +fi + +SCRIPT_DIR="$(cd "$( dirname $0 )" && pwd)" +export GOPATH="$GOPATH:${SCRIPT_DIR}" +export GOBIN="${SCRIPT_DIR}/bin" +mkdir -p ${GOBIN} || die "failed to create ${GOBIN}" + +# Check for go +which go &> /dev/null +if [ $? -ne 0 ]; then + cat <<EOF +You must install the Golang programming language. + +If you are using Debian, try "apt-get install golang". +For Red Hat, try "yum install go". +For other distributions and operating systems use your packaging tool. +EOF +exit 1 +fi + +# Check for libleveldb.so +if [ -x "/sbin/ldconfig" ]; then + # Suse requires ldconfig to be run via the absolute path + ldconfig=/sbin/ldconfig +else + ldconfig=ldconfig +fi +if "${ldconfig}" -p | grep -q libleveldb; then + : +else + echo "You must install the leveldb-devel package (or distro-specific equivalent.)" + exit 1 +fi + +go "${ACTION}" -v org/apache/htrace/htraced org/apache/htrace/htrace "$@" http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/common/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/rest.go b/htrace-core/src/go/src/org/apache/htrace/common/rest.go new file mode 100644 index 0000000..1b1db03 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/rest.go @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +// Info returned by /serverInfo +type ServerInfo struct { + // The server release version. + Version string +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/common/span.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span.go b/htrace-core/src/go/src/org/apache/htrace/common/span.go new file mode 100644 index 0000000..6f67ce6 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/span.go @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "encoding/json" +) + +// +// Represents a trace span. +// +// Compatibility notes: +// We use signed numbers here, even in cases where unsigned would make more sense. This is because +// Java doesn't support unsigned integers, and we'd like to match the representation used by the +// Java client. For example, if we log a message about a span id in the Java client, it would be +// nice if we could match it up with a log message about the same span id in this server, without +// doing a mental conversion from signed to unsigned. +// +// When converting to JSON, we store the 64-bit numbers as strings rather than as integers. This is +// because JavaScript lacks the ability to handle 64-bit integers. Numbers above about 55 bits will +// be rounded by Javascript. Since the Javascript UI is a primary consumer of this JSON data, we +// have to simply pass it as a string. +// + +const INVALID_SPAN_ID = 0 + +type TraceInfoMap map[string][]byte + +type TimelineAnnotation struct { + Time int64 `json:"time,string"` + Msg string `json:"msg"` +} + +type SpanIdSlice []int64 + +type SpanData struct { + Start int64 `json:"start,string"` + Stop int64 `json:"stop,string"` + Description string `json:"desc"` + TraceId int64 `json:"tid,string"` + ParentId int64 `json:"prid,string"` + Info TraceInfoMap `json:"info,omitempty"` + ProcessId int32 `json:"pid"` + TimelineAnnotations []TimelineAnnotation `json:"ta,omitempty"` +} + +type Span struct { + SpanId int64 `json:"sid,string"` + SpanData +} + +func (span *Span) ToJson() []byte { + jbytes, err := json.Marshal(*span) + if err != nil { + panic(err) + } + return jbytes +} + +type SpanSlice []Span + +func (spans SpanSlice) ToJson() []byte { + jbytes, err := json.Marshal(spans) + if err != nil { + panic(err) + } + return jbytes +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/common/span_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/span_test.go b/htrace-core/src/go/src/org/apache/htrace/common/span_test.go new file mode 100644 index 0000000..545c669 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/span_test.go @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "math/rand" + "testing" +) + +func TestSpanToJson(t *testing.T) { + t.Parallel() + span := Span{SpanId: 2305843009213693952, + SpanData: SpanData{ + Start: 123, + Stop: 456, + Description: "getFileDescriptors", + TraceId: 999, + ParentId: INVALID_SPAN_ID, + ProcessId: 331, + }} + ExpectStrEqual(t, + `{"sid":"2305843009213693952","start":"123","stop":"456","desc":"getFileDescriptors","tid":"999","prid":"0","pid":331}`, + string(span.ToJson())) +} + +func TestAnnotatedSpanToJson(t *testing.T) { + t.Parallel() + span := Span{SpanId: 1305813009213693952, + SpanData: SpanData{ + Start: 1234, + Stop: 4567, + Description: "getFileDescriptors2", + TraceId: 999, + ParentId: INVALID_SPAN_ID, + ProcessId: 331, + TimelineAnnotations: []TimelineAnnotation{ + TimelineAnnotation{ + Time: 7777, + Msg: "contactedServer", + }, + TimelineAnnotation{ + Time: 8888, + Msg: "passedFd", + }, + }, + }} + ExpectStrEqual(t, + `{"sid":"1305813009213693952","start":"1234","stop":"4567","desc":"getFileDescriptors2","tid":"999","prid":"0","pid":331,"ta":[{"time":"7777","msg":"contactedServer"},{"time":"8888","msg":"passedFd"}]}`, + string(span.ToJson())) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/common/test_util.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/test_util.go b/htrace-core/src/go/src/org/apache/htrace/common/test_util.go new file mode 100644 index 0000000..871c847 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/test_util.go @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +import ( + "fmt" + "testing" + "time" +) + +type Int64Slice []int64 + +func (p Int64Slice) Len() int { return len(p) } +func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type SupplierFun func() bool + +// +// Wait for a configurable amount of time for a precondition to become true. +// +// Example: +// WaitFor(time.Minute * 1, time.Millisecond * 1, func() bool { +// return ht.Store.GetStatistics().NumSpansWritten >= 3 +// }) +// +func WaitFor(dur time.Duration, poll time.Duration, fun SupplierFun) { + if poll == 0 { + poll = dur / 10 + } + if poll <= 0 { + panic("Can't have a polling time less than zero.") + } + endTime := time.Now().Add(dur) + for { + if fun() { + return + } + if !time.Now().Before(endTime) { + break + } + time.Sleep(poll) + } + panic(fmt.Sprintf("Timed out after %s", dur)) +} + +// Trigger a test failure if two strings are not equal. +func ExpectStrEqual(t *testing.T, expect string, actual string) { + if expect != actual { + t.Fatalf("Expected:\n%s\nGot:\n%s\n", expect, actual) + } +} + +// Trigger a test failure if the JSON representation of two spans are not equals. +func ExpectSpansEqual(t *testing.T, spanA *Span, spanB *Span) { + ExpectStrEqual(t, string(spanA.ToJson()), string(spanB.ToJson())) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/common/version.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/common/version.go b/htrace-core/src/go/src/org/apache/htrace/common/version.go new file mode 100644 index 0000000..789f12f --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/common/version.go @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package common + +const RELEASE_VERSION = "1.0-SNAPSHOT" http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/conf/config.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config.go b/htrace-core/src/go/src/org/apache/htrace/conf/config.go new file mode 100644 index 0000000..528d6c1 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/conf/config.go @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "bufio" + "fmt" + "io" + "log" + "os" + "strconv" + "strings" + "syscall" +) + +// +// The configuration code for HTraced. +// +// HTraced can be configured via Hadoop-style XML configuration files, or by passing -Dkey=value +// command line arguments. Command-line arguments without an equals sign, such as "-Dkey", will be +// treated as setting the key to "true". +// +// Configuration key constants should be defined in config_keys.go. Each key should have a default, +// which will be used if the user supplies no value, or supplies an invalid value. +// For that reason, it is not necessary for the Get, GetInt, etc. functions to take a default value +// argument. +// + +type Config struct { + settings map[string]string + defaults map[string]string +} + +type Builder struct { + // If non-nil, the XML configuration file to read. + Reader io.Reader + + // If non-nil, the configuration values to use. + Values map[string]string + + // If non-nil, the default configuration values to use. + Defaults map[string]string + + // If non-nil, the command-line arguments to use. + Argv []string +} + +// Load a configuration from the application's argv, configuration file, and the standard +// defaults. +func LoadApplicationConfig() *Config { + reader, err := openFile(CONFIG_FILE_NAME, []string{"."}) + if err != nil { + log.Fatal("Error opening config file: " + err.Error()) + } + bld := Builder{} + if reader != nil { + defer reader.Close() + bld.Reader = bufio.NewReader(reader) + } + bld.Argv = os.Args[1:] + bld.Defaults = DEFAULTS + var cnf *Config + cnf, err = bld.Build() + if err != nil { + log.Fatal("Error building configuration: " + err.Error()) + } + os.Args = append(os.Args[0:1], bld.Argv...) + return cnf +} + +// Attempt to open a configuration file somewhere on the provided list of paths. +func openFile(cnfName string, paths []string) (io.ReadCloser, error) { + for p := range paths { + path := fmt.Sprintf("%s%c%s", paths[p], os.PathSeparator, cnfName) + file, err := os.Open(path) + if err == nil { + log.Println("Reading configuration from " + path) + return file, nil + } + if e, ok := err.(*os.PathError); ok && e.Err == syscall.ENOENT { + continue + } + log.Println("Error opening " + path + " for read: " + err.Error()) + } + return nil, nil +} + +// Build a new configuration object from the provided conf.Builder. +func (bld *Builder) Build() (*Config, error) { + // Load values and defaults + cnf := Config{} + cnf.settings = make(map[string]string) + if bld.Values != nil { + for k, v := range bld.Values { + cnf.settings[k] = v + } + } + cnf.defaults = make(map[string]string) + if bld.Defaults != nil { + for k, v := range bld.Defaults { + cnf.defaults[k] = v + } + } + + // Process the configuration file, if we have one + if bld.Reader != nil { + parseXml(bld.Reader, cnf.settings) + } + + // Process command line arguments + var i int + for i < len(bld.Argv) { + str := bld.Argv[i] + if strings.HasPrefix(str, "-D") { + idx := strings.Index(str, "=") + if idx == -1 { + key := str[2:] + cnf.settings[key] = "true" + } else { + key := str[2:idx] + val := str[idx+1:] + cnf.settings[key] = val + } + bld.Argv = append(bld.Argv[:i], bld.Argv[i+1:]...) + } else { + i++ + } + } + return &cnf, nil +} + +// Get a string configuration key. +func (cnf *Config) Get(key string) string { + ret := cnf.settings[key] + if ret != "" { + return ret + } + return cnf.defaults[key] +} + +// Get a boolean configuration key. +func (cnf *Config) GetBool(key string) bool { + str := cnf.settings[key] + ret, err := strconv.ParseBool(str) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.ParseBool(str) + if err == nil { + return ret + } + return false +} + +// Get an integer configuration key. +func (cnf *Config) GetInt(key string) int { + str := cnf.settings[key] + ret, err := strconv.Atoi(str) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.Atoi(str) + if err == nil { + return ret + } + return 0 +} + +// Get an int64 configuration key. +func (cnf *Config) GetInt64(key string) int64 { + str := cnf.settings[key] + ret, err := strconv.ParseInt(str, 10, 64) + if err == nil { + return ret + } + str = cnf.defaults[key] + ret, err = strconv.ParseInt(str, 10, 64) + if err == nil { + return ret + } + return 0 +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go new file mode 100644 index 0000000..b4e5994 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_keys.go @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "fmt" + "os" +) + +// +// Configuration keys for HTrace. +// + +// The platform-specific path separator. Usually slash. +var PATH_SEP string = fmt.Sprintf("%c", os.PathSeparator) + +// The platform-specific path list separator. Usually colon. +var PATH_LIST_SEP string = fmt.Sprintf("%c", os.PathListSeparator) + +// The name of the XML configuration file to look for. +const CONFIG_FILE_NAME = "htraced.xml" + +// The web address to start the REST server on. +const HTRACE_WEB_ADDRESS = "web.address" + +// The default port for the Htrace web address. +const HTRACE_WEB_ADDRESS_DEFAULT_PORT = 9095 + +// The directories to put the data store into. Separated by PATH_LIST_SEP. +const HTRACE_DATA_STORE_DIRECTORIES = "data.store.directories" + +// Boolean key which indicates whether we should clear data on startup. +const HTRACE_DATA_STORE_CLEAR = "data.store.clear" + +// How many writes to buffer before applying backpressure to span senders. +const HTRACE_DATA_STORE_SPAN_BUFFER_SIZE = "data.store.span.buffer.size" + +// Default values for HTrace configuration keys. +var DEFAULTS = map[string]string{ + HTRACE_WEB_ADDRESS: fmt.Sprintf("0.0.0.0:%d", HTRACE_WEB_ADDRESS_DEFAULT_PORT), + HTRACE_DATA_STORE_DIRECTORIES: PATH_SEP + "tmp" + PATH_SEP + "htrace1" + + PATH_LIST_SEP + PATH_SEP + "tmp" + PATH_SEP + "htrace2", + HTRACE_DATA_STORE_CLEAR: "false", + HTRACE_DATA_STORE_SPAN_BUFFER_SIZE: "100", +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/conf/config_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/config_test.go b/htrace-core/src/go/src/org/apache/htrace/conf/config_test.go new file mode 100644 index 0000000..85b1517 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/conf/config_test.go @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "strings" + "testing" +) + +// Test that parsing command-line arguments of the form -Dfoo=bar works. +func TestParseArgV(t *testing.T) { + t.Parallel() + argv := []string{"-Dfoo=bar", "-Dbaz=123", "-DsillyMode"} + bld := &ConfigBuilder{Argv: argv} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + if "bar" != cnf.Get("foo") { + t.Fatal() + } + if 123 != cnf.GetInt("baz") { + t.Fatal() + } + if !cnf.GetBool("sillyMode") { + t.Fatal() + } + if cnf.GetBool("otherSillyMode") { + t.Fatal() + } +} + +// Test that default values work. +// Defaults are used only when the configuration option is not present or can't be parsed. +func TestDefaults(t *testing.T) { + t.Parallel() + argv := []string{"-Dfoo=bar", "-Dbaz=invalidNumber"} + defaults := map[string]string{ + "foo": "notbar", + "baz": "456", + "foo2": "4611686018427387904", + } + bld := &ConfigBuilder{Argv: argv, Defaults: defaults} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + if "bar" != cnf.Get("foo") { + t.Fatal() + } + if 456 != cnf.GetInt("baz") { + t.Fatal() + } + if 4611686018427387904 != cnf.GetInt64("foo2") { + t.Fatal() + } +} + +// Test that we can parse our XML configuration file. +func TestXmlConfigurationFile(t *testing.T) { + t.Parallel() + xml := ` +<?xml version="1.0"?> +<?xml-stylesheet type=\"text/xsl\" href=\"configuration.xsl\"?> +<configuration> + <property> + <name>foo.bar</name> + <value>123</value> + </property> + <property> + <name>foo.baz</name> + <value>xmlValue</value> + </property> + <!--<property> + <name>commented.out</name> + <value>stuff</value> + </property>--> +</configuration> +` + xmlReader := strings.NewReader(xml) + argv := []string{"-Dfoo.bar=456"} + defaults := map[string]string{ + "foo.bar": "789", + "cmdline.opt": "4611686018427387904", + } + bld := &ConfigBuilder{Argv: argv, Defaults: defaults, Reader: xmlReader} + cnf, err := bld.Build() + if err != nil { + t.Fatal() + } + // The command-line argument takes precedence over the XML and the defaults. + if 456 != cnf.GetInt("foo.bar") { + t.Fatal() + } + if "xmlValue" != cnf.Get("foo.baz") { + t.Fatalf("foo.baz = %s", cnf.Get("foo.baz")) + } + if "" != cnf.Get("commented.out") { + t.Fatal() + } + if 4611686018427387904 != cnf.GetInt64("cmdline.opt") { + t.Fatal() + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/conf/xml.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/conf/xml.go b/htrace-core/src/go/src/org/apache/htrace/conf/xml.go new file mode 100644 index 0000000..de14bc5 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/conf/xml.go @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package conf + +import ( + "encoding/xml" + "io" + "log" +) + +type configuration struct { + Properties []propertyXml `xml:"property"` +} + +type propertyXml struct { + Name string `xml:"name"` + Value string `xml:"value"` +} + +// Parse an XML configuration file. +func parseXml(reader io.Reader, m map[string]string) error { + dec := xml.NewDecoder(reader) + configurationXml := configuration{} + err := dec.Decode(&configurationXml) + if err != nil { + return err + } + props := configurationXml.Properties + for p := range props { + key := props[p].Name + value := props[p].Value + if key == "" { + log.Println("Warning: ignoring element with missing or empty <name>.") + continue + } + if value == "" { + log.Println("Warning: ignoring element with key " + key + " with missing or empty <value>.") + continue + } + //log.Printf("setting %s to %s\n", key, value) + m[key] = value + } + return nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go new file mode 100644 index 0000000..d4bb253 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htrace/cmd.go @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "errors" + "fmt" + "gopkg.in/alecthomas/kingpin.v1" + "io/ioutil" + "net/http" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" +) + +func main() { + // Load htraced configuration + cnf := conf.LoadApplicationConfig() + + // Parse argv + app := kingpin.New("htrace", "The HTrace tracing utility.") + addr := app.Flag("addr", "Server address."). + Default(cnf.Get(conf.HTRACE_WEB_ADDRESS)).TCP() + version := app.Command("version", "Print the version of this program.") + serverInfo := app.Command("serverInfo", "Print information retrieved from an htraced server.") + findSpan := app.Command("findSpan", "Print information about a trace span with a given ID.") + findSpanId := findSpan.Flag("id", "Span ID to find, as a signed decimal 64-bit "+ + "number").Required().Int64() + findChildren := app.Command("findChildren", "Print out the span IDs that are children of a given span ID.") + parentSpanId := findChildren.Flag("id", "Span ID to print children for, as a signed decimal 64-bit "+ + "number").Required().Int64() + childLim := findChildren.Flag("lim", "Maximum number of child IDs to print.").Default("20").Int() + + // Handle operation + switch kingpin.MustParse(app.Parse(os.Args[1:])) { + case version.FullCommand(): + os.Exit(printVersion()) + case serverInfo.FullCommand(): + os.Exit(printServerInfo(addr.String())) + case findSpan.FullCommand(): + os.Exit(doFindSpan(addr.String(), *findSpanId)) + case findChildren.FullCommand(): + os.Exit(doFindChildren(addr.String(), *parentSpanId, *childLim)) + } + + app.UsageErrorf(os.Stderr, "You must supply a command to run.") +} + +// Print the version of the htrace binary. +func printVersion() int { + fmt.Printf("Running htrace command version %s.\n", common.RELEASE_VERSION) + return 0 +} + +// Print information retrieved from an htraced server via /serverInfo +func printServerInfo(restAddr string) int { + buf, err := makeRestRequest(restAddr, "serverInfo") + if err != nil { + fmt.Printf("%s\n", err.Error()) + return 1 + } + var info common.ServerInfo + err = json.Unmarshal(buf, &info) + if err != nil { + fmt.Printf("Error: error unmarshalling response body %s: %s\n", + string(buf), err.Error()) + return 1 + } + fmt.Printf("HTraced server version %s\n", info.Version) + return 0 +} + +// Print information about a trace span. +func doFindSpan(restAddr string, sid int64) int { + buf, err := makeRestRequest(restAddr, fmt.Sprintf("findSid?sid=%d", sid)) + if err != nil { + fmt.Printf("%s\n", err.Error()) + return 1 + } + var span common.Span + err = json.Unmarshal(buf, &span) + if err != nil { + fmt.Printf("Error: error unmarshalling response body %s: %s\n", + string(buf), err.Error()) + return 1 + } + pbuf, err := json.MarshalIndent(span, "", " ") + if err != nil { + fmt.Println("Error: error pretty-printing span to JSON: %s", err.Error()) + return 1 + } + fmt.Printf("%s\n", string(pbuf)) + return 0 +} + +// Find information about the children of a span. +func doFindChildren(restAddr string, sid int64, lim int) int { + buf, err := makeRestRequest(restAddr, fmt.Sprintf("findChildren?sid=%d&lim=%d", sid, lim)) + if err != nil { + fmt.Printf("%s\n", err.Error()) + return 1 + } + var spanIds []int64 + err = json.Unmarshal(buf, &spanIds) + if err != nil { + fmt.Printf("Error: error unmarshalling response body %s: %s\n", + string(buf), err.Error()) + return 1 + } + pbuf, err := json.MarshalIndent(spanIds, "", " ") + if err != nil { + fmt.Println("Error: error pretty-printing span IDs to JSON: %s", err.Error()) + return 1 + } + fmt.Printf("%s\n", string(pbuf)) + return 0 +} + +// Print information retrieved from an htraced server via /serverInfo +func makeRestRequest(restAddr string, reqName string) ([]byte, error) { + url := fmt.Sprintf("http://%s/%s", restAddr, reqName) + req, err := http.NewRequest("GET", url, nil) + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error making http request to %s: %s\n", url, + err.Error())) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, errors.New(fmt.Sprintf("Error: got bad response status from %s: %s\n", url, resp.Status)) + } + var body []byte + body, err = ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.New(fmt.Sprintf("Error: error reading response body: %s\n", err.Error())) + } + return body, nil +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go new file mode 100644 index 0000000..d78e369 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore.go @@ -0,0 +1,438 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "bytes" + "encoding/gob" + "github.com/jmhodges/levigo" + "log" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" + "strings" + "sync/atomic" + "syscall" +) + +// +// The data store code for HTraced. +// +// This code stores the trace spans. We use levelDB here so that we don't have to store everything +// in memory at all times. The data is sharded across multiple levelDB databases in multiple +// directories. Normally, these multiple directories will be on multiple disk drives. +// +// The main emphasis in the HTraceD data store is on quickly and efficiently storing trace span data +// coming from many daemons. Durability is not as big a concern as in some data stores, since +// losing a little bit of trace data if htraced goes down is not critical. We use the "gob" package +// for serialization. We assume that there will be many more writes than reads. +// +// TODO: implement redundancy (storing data on more than 1 drive) +// TODO: implement re-loading old span data +// +// Schema +// m -> dataStoreMetadata +// s[8-byte-big-endian-sid] -> SpanData +// p[8-byte-big-endian-parent-sid][8-byte-big-endian-child-sid] -> {} +// t[8-byte-big-endian-time][8-byte-big-endian-child-sid] -> {} +// + +const DATA_STORE_VERSION = 1 + +var EMPTY_BYTE_BUF []byte = []byte{} + +type Statistics struct { + NumSpansWritten uint64 +} + +func (stats *Statistics) IncrementWrittenSpans() { + atomic.AddUint64(&stats.NumSpansWritten, 1) +} + +// Make a copy of the statistics structure, using atomic operations. +func (stats *Statistics) Copy() *Statistics { + return &Statistics{ + NumSpansWritten: atomic.LoadUint64(&stats.NumSpansWritten), + } +} + +// Translate a span id into a leveldb key. +func makeKey(tag byte, sid int64) []byte { + id := uint64(sid) + return []byte{ + tag, + byte(0xff & (id >> 56)), + byte(0xff & (id >> 48)), + byte(0xff & (id >> 40)), + byte(0xff & (id >> 32)), + byte(0xff & (id >> 24)), + byte(0xff & (id >> 16)), + byte(0xff & (id >> 8)), + byte(0xff & (id >> 0)), + } +} + +func keyToInt(key []byte) int64 { + var id uint64 + id = (uint64(key[0]) << 56) | + (uint64(key[1]) << 48) | + (uint64(key[2]) << 40) | + (uint64(key[3]) << 32) | + (uint64(key[4]) << 24) | + (uint64(key[5]) << 16) | + (uint64(key[6]) << 8) | + (uint64(key[7]) << 0) + return int64(id) +} + +func makeSecondaryKey(tag byte, first int64, second int64) []byte { + fir := uint64(first) + sec := uint64(second) + return []byte{ + tag, + byte(0xff & (fir >> 56)), + byte(0xff & (fir >> 48)), + byte(0xff & (fir >> 40)), + byte(0xff & (fir >> 32)), + byte(0xff & (fir >> 24)), + byte(0xff & (fir >> 16)), + byte(0xff & (fir >> 8)), + byte(0xff & (fir >> 0)), + byte(0xff & (sec >> 56)), + byte(0xff & (sec >> 48)), + byte(0xff & (sec >> 40)), + byte(0xff & (sec >> 32)), + byte(0xff & (sec >> 24)), + byte(0xff & (sec >> 16)), + byte(0xff & (sec >> 8)), + byte(0xff & (sec >> 0)), + } +} + +// A single directory containing a levelDB instance. +type shard struct { + // The data store that this shard is part of + store *dataStore + + // The LevelDB instance. + ldb *levigo.DB + + // The path to the leveldb directory this shard is managing. + path string + + // Incoming requests to write Spans. + incoming chan *common.Span + + // The channel we will send a bool to when we exit. + exited chan bool +} + +// Metadata about the DataStore. +type dataStoreMetadata struct { + // The DataStore version. + Version int32 +} + +// Write the metadata key to a shard. +func (shd *shard) WriteMetadata(meta *dataStoreMetadata) error { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + err := encoder.Encode(meta) + if err != nil { + return err + } + return shd.ldb.Put(shd.store.writeOpts, []byte("m"), w.Bytes()) +} + +// Process incoming spans for a shard. +func (shd *shard) processIncoming() { + for { + span := <-shd.incoming + if span == nil { + log.Printf("Shard processor for %s exiting.", shd.path) + shd.exited <- true + return + } + err := shd.writeSpan(span) + if err != nil { + log.Fatal("Shard processor for %s got fatal error %s.", shd.path, err.Error()) + } + //log.Printf("Shard processor for %s wrote span %s.", shd.path, span.ToJson()) + } +} + +func (shd *shard) writeSpan(span *common.Span) error { + batch := levigo.NewWriteBatch() + defer batch.Close() + + // Add SpanData to batch. + spanDataBuf := new(bytes.Buffer) + spanDataEnc := gob.NewEncoder(spanDataBuf) + err := spanDataEnc.Encode(span.SpanData) + if err != nil { + return err + } + batch.Put(makeKey('s', span.SpanId), spanDataBuf.Bytes()) + + // Add this to the parent index. + batch.Put(makeSecondaryKey('p', span.ParentId, span.SpanId), EMPTY_BYTE_BUF) + + // Add this to the timeline index. + batch.Put(makeSecondaryKey('t', span.ParentId, span.SpanId), EMPTY_BYTE_BUF) + + err = shd.ldb.Write(shd.store.writeOpts, batch) + if err != nil { + return err + } + shd.store.stats.IncrementWrittenSpans() + if shd.store.WrittenSpans != nil { + shd.store.WrittenSpans <- span + } + return nil +} + +func (shd *shard) FindChildren(sid int64, childIds []int64, lim int32) ([]int64, int32, error) { + searchKey := makeKey('p', sid) + iter := shd.ldb.NewIterator(shd.store.readOpts) + defer iter.Close() + iter.Seek(searchKey) + for { + if !iter.Valid() { + break + } + if lim == 0 { + break + } + key := iter.Key() + if !bytes.HasPrefix(key, searchKey) { + break + } + id := keyToInt(key[9:]) + childIds = append(childIds, id) + lim-- + iter.Next() + } + return childIds, lim, nil +} + +// Close a shard. +func (shd *shard) Close() { + shd.incoming <- nil + log.Printf("Waiting for %s to exit...", shd.path) + if shd.exited != nil { + <-shd.exited + } + shd.ldb.Close() + log.Printf("Closed %s...", shd.path) +} + +// The Data Store. +type dataStore struct { + // The shards which manage our LevelDB instances. + shards []*shard + + // I/O statistics for all shards. + stats Statistics + + // The read options to use for LevelDB. + readOpts *levigo.ReadOptions + + // The write options to use for LevelDB. + writeOpts *levigo.WriteOptions + + // If non-null, a channel we will send spans to once we finish writing them. This is only used + // for testing. + WrittenSpans chan *common.Span +} + +func CreateDataStore(cnf *conf.Config, writtenSpans chan *common.Span) (*dataStore, error) { + // Get the configuration. + clearStored := cnf.GetBool(conf.HTRACE_DATA_STORE_CLEAR) + dirsStr := cnf.Get(conf.HTRACE_DATA_STORE_DIRECTORIES) + dirs := strings.Split(dirsStr, conf.PATH_LIST_SEP) + + // If we return an error, close the store. + var err error + store := &dataStore{shards: []*shard{}, WrittenSpans: writtenSpans} + defer func() { + if err != nil { + store.Close() + store = nil + } + }() + + store.readOpts = levigo.NewReadOptions() + store.readOpts.SetFillCache(true) + store.writeOpts = levigo.NewWriteOptions() + store.writeOpts.SetSync(false) + + // Open all shards + for idx := range dirs { + path := dirs[idx] + conf.PATH_SEP + "db" + err := os.MkdirAll(path, 0777) + if err != nil { + e, ok := err.(*os.PathError) + if !ok || e.Err != syscall.EEXIST { + return nil, err + } + if !clearStored { + // TODO: implement re-opening saved data + log.Println("Error: path " + path + "already exists.") + return nil, err + } else { + err = os.RemoveAll(path) + if err != nil { + log.Println("Failed to create " + path + ": " + err.Error()) + return nil, err + } + log.Println("Cleared " + path) + } + } + var shd *shard + shd, err = CreateShard(store, cnf, path) + if err != nil { + log.Printf("Error creating shard %s: %s", path, err.Error()) + return nil, err + } + store.shards = append(store.shards, shd) + } + meta := &dataStoreMetadata{Version: DATA_STORE_VERSION} + for idx := range store.shards { + shd := store.shards[idx] + err := shd.WriteMetadata(meta) + if err != nil { + log.Println("Failed to write metadata to " + store.shards[idx].path + ": " + err.Error()) + return nil, err + } + shd.exited = make(chan bool, 1) + go shd.processIncoming() + } + return store, nil +} + +func CreateShard(store *dataStore, cnf *conf.Config, path string) (*shard, error) { + var shd *shard + //filter := levigo.NewBloomFilter(10) + //defer filter.Close() + openOpts := levigo.NewOptions() + defer openOpts.Close() + openOpts.SetCreateIfMissing(true) + //openOpts.SetFilterPolicy(filter) + ldb, err := levigo.Open(path, openOpts) + if err != nil { + log.Println("LevelDB failed to open " + path + ": " + err.Error()) + return nil, err + } + defer func() { + if shd == nil { + ldb.Close() + } + }() + spanBufferSize := cnf.GetInt(conf.HTRACE_DATA_STORE_SPAN_BUFFER_SIZE) + shd = &shard{store: store, ldb: ldb, path: path, + incoming: make(chan *common.Span, spanBufferSize)} + log.Println("LevelDB opened " + path) + return shd, nil +} + +func (store *dataStore) GetStatistics() *Statistics { + return store.stats.Copy() +} + +// Close the DataStore. +func (store *dataStore) Close() { + for idx := range store.shards { + store.shards[idx].Close() + } + if store.readOpts != nil { + store.readOpts.Close() + } + if store.writeOpts != nil { + store.writeOpts.Close() + } +} + +// Get the index of the shard which stores the given spanId. +func (store *dataStore) getShardIndex(spanId int64) int { + return int(uint64(spanId) % uint64(len(store.shards))) +} + +func (store *dataStore) WriteSpan(span *common.Span) { + store.shards[store.getShardIndex(span.SpanId)].incoming <- span +} + +func (store *dataStore) FindSpan(sid int64) *common.Span { + return store.shards[store.getShardIndex(sid)].FindSpan(sid) +} + +func (shd *shard) FindSpan(sid int64) *common.Span { + buf, err := shd.ldb.Get(shd.store.readOpts, makeKey('s', sid)) + if err != nil { + if strings.Index(err.Error(), "NotFound:") != -1 { + return nil + } + log.Printf("Shard(%s): FindSpan(%d) error: %s\n", + shd.path, sid, err.Error()) + return nil + } + // check for empty buf here? + r := bytes.NewBuffer(buf) + decoder := gob.NewDecoder(r) + data := common.SpanData{} + err = decoder.Decode(&data) + if err != nil { + log.Printf("Shard(%s): FindSpan(%d) decode error: %s\n", + shd.path, sid, err.Error()) + return nil + } + return &common.Span{SpanId: sid, SpanData: data} +} + +// Find the children of a given span id. +func (store *dataStore) FindChildren(sid int64, lim int32) []int64 { + childIds := make([]int64, 0) + var err error + + startIdx := store.getShardIndex(sid) + idx := startIdx + numShards := len(store.shards) + for { + if lim == 0 { + break + } + shd := store.shards[idx] + childIds, lim, err = shd.FindChildren(sid, childIds, lim) + if err != nil { + log.Printf("Shard(%s): FindChildren(%d) error: %s\n", + shd.path, sid, err.Error()) + } + idx++ + if idx >= numShards { + idx = 0 + } + if idx == startIdx { + break + } + } + return childIds +} + +//func (store *dataStore) FindByTimeRange(startTime int64, endTime int64, lim int32) []int64 { +//} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go new file mode 100644 index 0000000..c74b572 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/datastore_test.go @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "math/rand" + "org/apache/htrace/common" + "org/apache/htrace/test" + "sort" + "testing" +) + +// Test creating and tearing down a datastore. +func TestCreateDatastore(t *testing.T) { + htraceBld := &MiniHTracedBuilder{Name: "TestCreateDatastore", NumDataDirs: 3} + ht, err := htraceBld.Build() + if err != nil { + t.Fatalf("failed to create datastore: %s", err.Error()) + } + defer ht.Close() +} + +var SIMPLE_TEST_SPANS []common.Span = []common.Span{ + common.Span{SpanId: 1, + SpanData: common.SpanData{ + Start: 123, + Stop: 456, + Description: "getFileDescriptors", + TraceId: 999, + ParentId: common.INVALID_SPAN_ID, + ProcessId: 331, + }}, + common.Span{SpanId: 2, + SpanData: common.SpanData{ + Start: 125, + Stop: 200, + Description: "openFd", + TraceId: 999, + ParentId: 1, + ProcessId: 332, + }}, + common.Span{SpanId: 3, + SpanData: common.SpanData{ + Start: 200, + Stop: 456, + Description: "passFd", + TraceId: 999, + ParentId: 1, + ProcessId: 332, + }}, +} + +func createSpans(spans []common.Span, store *dataStore) { + for idx := range spans { + store.WriteSpan(&spans[idx]) + } + // Wait the spans to be created + for i := 0; i < 3; i++ { + <-store.WrittenSpans + } +} + +// Test creating a datastore and adding some spans. +func TestDatastoreWriteAndRead(t *testing.T) { + t.Parallel() + htraceBld := &MiniHTracedBuilder{Name: "TestDatastoreWriteAndRead", + WrittenSpans: make(chan *common.Span, 100)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + createSpans(SIMPLE_TEST_SPANS, ht.Store) + if ht.Store.GetStatistics().NumSpansWritten < uint64(len(SIMPLE_TEST_SPANS)) { + t.Fatal() + } + span := ht.Store.FindSpan(1) + if span == nil { + t.Fatal() + } + if span.SpanId != 1 { + t.Fatal() + } + common.ExpectSpansEqual(t, span, &SIMPLE_TEST_SPANS[0]) + children := ht.Store.FindChildren(1, 1) + if len(children) != 1 { + t.Fatalf("expected 1 child, but got %d\n", len(children)) + } + children = ht.Store.FindChildren(1, 2) + if len(children) != 2 { + t.Fatalf("expected 2 children, but got %d\n", len(children)) + } + sort.Sort(common.Int64Slice(children)) + if children[0] != 2 { + t.Fatal() + } + if children[1] != 3 { + t.Fatal() + } +} + +func BenchmarkDatastoreWrites(b *testing.B) { + htraceBld := &MiniHTracedBuilder{Name: "BenchmarkDatastoreWrites", + WrittenSpans: make(chan *common.Span, b.N)} + ht, err := htraceBld.Build() + if err != nil { + panic(err) + } + defer ht.Close() + rnd := rand.New(rand.NewSource(1)) + allSpans := make([]*common.Span, b.N) + // Write many random spans. + for n := 0; n < b.N; n++ { + span := test.NewRandomSpan(rnd, allSpans[0:n]) + ht.Store.WriteSpan(span) + allSpans[n] = span + } + // Wait for all the spans to be written. + for n := 0; n < b.N; n++ { + <-ht.Store.WrittenSpans + } + spansWritten := ht.Store.GetStatistics().NumSpansWritten + if spansWritten < uint64(b.N) { + b.Fatal("incorrect statistics: expected %d spans to be written, but only got %d", + b.N, spansWritten) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go new file mode 100644 index 0000000..d6cfe2a --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/htraced.go @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "log" + "org/apache/htrace/conf" +) + +func main() { + cnf := conf.LoadApplicationConfig() + store, err := CreateDataStore(cnf, nil) + if err != nil { + log.Fatalf("Error creating datastore: %s\n", err.Error()) + } + startRestServer(cnf, store) +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go new file mode 100644 index 0000000..43d7e23 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/mini_htraced.go @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "fmt" + "io/ioutil" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "os" + "strings" +) + +// +// MiniHTraceD is used in unit tests to set up a daemon with certain settings. +// It takes care of things like creating and cleaning up temporary directories. +// + +// The default number of managed data directories to use. +const DEFAULT_NUM_DATA_DIRS = 2 + +// Builds a MiniHTraced object. +type MiniHTracedBuilder struct { + // The name of the MiniHTraced to build. This shows up in the test directory name and some + // other places. + Name string + + // The configuration values to use for the MiniHTraced. + // If ths is nil, we use the default configuration for everything. + Cnf map[string]string + + // The number of managed data directories to create. + // If this is 0, it defaults to DEFAULT_NUM_DATA_DIRS. + NumDataDirs int + + // If non-null, the WrittenSpans channel to use when creating the DataStore. + WrittenSpans chan *common.Span +} + +type MiniHTraced struct { + Name string + Cnf *conf.Config + DataDirs []string + Store *dataStore +} + +func (bld *MiniHTracedBuilder) Build() (*MiniHTraced, error) { + var err error + var store *dataStore + if bld.Name == "" { + bld.Name = "HTraceTest" + } + if bld.Cnf == nil { + bld.Cnf = make(map[string]string) + } + if bld.NumDataDirs == 0 { + bld.NumDataDirs = DEFAULT_NUM_DATA_DIRS + } + dataDirs := make([]string, bld.NumDataDirs) + defer func() { + if err != nil { + if store != nil { + store.Close() + } + for idx := range dataDirs { + if dataDirs[idx] != "" { + os.RemoveAll(dataDirs[idx]) + } + } + } + }() + for idx := range dataDirs { + dataDirs[idx], err = ioutil.TempDir(os.TempDir(), + fmt.Sprintf("%s%d", bld.Name, idx+1)) + if err != nil { + return nil, err + } + } + bld.Cnf[conf.HTRACE_DATA_STORE_DIRECTORIES] = strings.Join(dataDirs, conf.PATH_LIST_SEP) + cnfBld := conf.Builder{Values: bld.Cnf, Defaults: conf.DEFAULTS} + cnf, err := cnfBld.Build() + if err != nil { + return nil, err + } + store, err = CreateDataStore(cnf, bld.WrittenSpans) + if err != nil { + return nil, err + } + return &MiniHTraced{ + Cnf: cnf, + DataDirs: dataDirs, + Store: store, + }, nil +} + +func (ht *MiniHTraced) Close() { + ht.Store.Close() + for idx := range ht.DataDirs { + os.RemoveAll(ht.DataDirs[idx]) + } +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go new file mode 100644 index 0000000..8374c40 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/htraced/rest.go @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package main + +import ( + "encoding/json" + "log" + "net/http" + "org/apache/htrace/common" + "org/apache/htrace/conf" + "strconv" +) + +type serverInfoHandler struct { +} + +func (handler *serverInfoHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + version := common.ServerInfo{Version: common.RELEASE_VERSION} + buf, err := json.Marshal(&version) + if err != nil { + log.Printf("error marshalling ServerInfo: %s\n", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.Write(buf) +} + +type dataStoreHandler struct { + store *dataStore +} + +func (hand *dataStoreHandler) getReqField64(fieldName string, w http.ResponseWriter, + req *http.Request) (int64, bool) { + str := req.FormValue(fieldName) + if str == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("No " + fieldName + " specified.")) + return -1, false + } + val, err := strconv.ParseInt(str, 10, 64) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Error parsing " + fieldName + ": " + err.Error())) + return -1, false + } + return val, true +} + +func (hand *dataStoreHandler) getReqField32(fieldName string, w http.ResponseWriter, + req *http.Request) (int32, bool) { + str := req.FormValue(fieldName) + if str == "" { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("No " + fieldName + " specified.")) + return -1, false + } + val, err := strconv.ParseInt(str, 10, 32) + if err != nil { + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte("Error parsing " + fieldName + ": " + err.Error())) + return -1, false + } + return int32(val), true +} + +type findSidHandler struct { + dataStoreHandler +} + +func (hand *findSidHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + req.ParseForm() + sid, ok := hand.getReqField64("sid", w, req) + if !ok { + return + } + span := hand.store.FindSpan(sid) + if span == nil { + w.WriteHeader(http.StatusNoContent) + return + } + w.Write(span.ToJson()) +} + +type findChildrenHandler struct { + dataStoreHandler +} + +func (hand *findChildrenHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + req.ParseForm() + sid, ok := hand.getReqField64("sid", w, req) + if !ok { + return + } + var lim int32 + lim, ok = hand.getReqField32("lim", w, req) + if !ok { + return + } + children := hand.store.FindChildren(sid, lim) + if len(children) == 0 { + w.WriteHeader(http.StatusNoContent) + return + } + jbytes, err := json.Marshal(children) + if err != nil { + panic(err) + } + w.Write(jbytes) +} + +func startRestServer(cnf *conf.Config, store *dataStore) { + mux := http.NewServeMux() + + serverInfoH := &serverInfoHandler{} + mux.Handle("/serverInfo", serverInfoH) + + findSidH := &findSidHandler{dataStoreHandler: dataStoreHandler{store: store}} + mux.Handle("/findSid", findSidH) + + findChildrenH := &findChildrenHandler{dataStoreHandler: dataStoreHandler{store: store}} + mux.Handle("/findChildren", findChildrenH) + + http.ListenAndServe(cnf.Get(conf.HTRACE_WEB_ADDRESS), mux) + log.Println("Started REST server...") +} http://git-wip-us.apache.org/repos/asf/incubator-htrace/blob/19259539/htrace-core/src/go/src/org/apache/htrace/test/random.go ---------------------------------------------------------------------- diff --git a/htrace-core/src/go/src/org/apache/htrace/test/random.go b/htrace-core/src/go/src/org/apache/htrace/test/random.go new file mode 100644 index 0000000..1272515 --- /dev/null +++ b/htrace-core/src/go/src/org/apache/htrace/test/random.go @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package test + +import ( + "math/rand" + "org/apache/htrace/common" +) + +func NonZeroRand64(rnd *rand.Rand) int64 { + for { + r := rnd.Int63() + if r == 0 { + continue + } + if rnd.Intn(1) != 0 { + return -r + } + return r + } +} + +func NonZeroRand32(rnd *rand.Rand) int32 { + for { + r := rnd.Int31() + if r == 0 { + continue + } + if rnd.Intn(1) != 0 { + return -r + } + return r + } +} + +// Create a random span. +func NewRandomSpan(rnd *rand.Rand, potentialParents []*common.Span) *common.Span { + var parentId int64 = common.INVALID_SPAN_ID + if potentialParents != nil { + parentIdx := rnd.Intn(len(potentialParents) + 1) + if parentIdx < len(potentialParents) { + parentId = potentialParents[parentIdx].SpanId + } + } + return &common.Span{SpanId: NonZeroRand64(rnd), + SpanData: common.SpanData{ + Start: NonZeroRand64(rnd), + Stop: NonZeroRand64(rnd), + Description: "getFileDescriptors", + TraceId: NonZeroRand64(rnd), + ParentId: parentId, + ProcessId: NonZeroRand32(rnd), + }} +}
